2 * Copyright © 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netvirt.elan.l2gw.listeners;
10 import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
11 import static org.opendaylight.genius.infra.Datastore.OPERATIONAL;
13 import com.google.common.collect.Sets;
14 import java.util.Collections;
15 import java.util.HashMap;
16 import java.util.HashSet;
19 import java.util.function.Predicate;
20 import javax.annotation.PostConstruct;
21 import javax.inject.Inject;
22 import javax.inject.Singleton;
23 import org.eclipse.jdt.annotation.Nullable;
24 import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
25 import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
26 import org.opendaylight.genius.utils.batching.ResourceBatchingManager;
27 import org.opendaylight.genius.utils.hwvtep.HwvtepNodeHACache;
28 import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundUtils;
29 import org.opendaylight.infrautils.jobcoordinator.JobCoordinator;
30 import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
31 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
32 import org.opendaylight.mdsal.binding.api.DataBroker;
33 import org.opendaylight.mdsal.binding.api.DataObjectModification;
34 import org.opendaylight.mdsal.binding.api.DataTreeModification;
35 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
36 import org.opendaylight.netvirt.elan.cache.ElanInstanceCache;
37 import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
38 import org.opendaylight.netvirt.elan.l2gw.ha.listeners.HAOpClusteredListener;
39 import org.opendaylight.netvirt.elan.l2gw.recovery.impl.L2GatewayServiceRecoveryHandler;
40 import org.opendaylight.netvirt.elan.l2gw.utils.ElanL2GatewayUtils;
41 import org.opendaylight.netvirt.elanmanager.utils.ElanL2GwCacheUtils;
42 import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayDevice;
43 import org.opendaylight.serviceutils.srm.RecoverableListener;
44 import org.opendaylight.serviceutils.srm.ServiceRecoveryRegistry;
45 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.IetfYangUtil;
46 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.MacAddress;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.elan.rev150602.elan.instances.ElanInstance;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.LocalUcastMacs;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.LogicalSwitches;
51 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
52 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
57 public class LocalUcastMacListener extends ChildListener<Node, LocalUcastMacs, String>
58 implements ClusteredDataTreeChangeListener<Node>, RecoverableListener {
60 private static final Logger LOG = LoggerFactory.getLogger(LocalUcastMacListener.class);
61 public static final String NODE_CHECK = "physical";
63 private static final Predicate<InstanceIdentifier<Node>> IS_PS_NODE_IID =
64 (iid) -> iid.firstKeyOf(Node.class).getNodeId().getValue().contains(NODE_CHECK);
66 private final ManagedNewTransactionRunner txRunner;
67 private final ElanL2GatewayUtils elanL2GatewayUtils;
68 private final HAOpClusteredListener haOpClusteredListener;
69 private final JobCoordinator jobCoordinator;
70 private final ElanInstanceCache elanInstanceCache;
71 private final HwvtepNodeHACache hwvtepNodeHACache;
74 public LocalUcastMacListener(final DataBroker dataBroker,
75 final HAOpClusteredListener haOpClusteredListener,
76 final ElanL2GatewayUtils elanL2GatewayUtils,
77 final JobCoordinator jobCoordinator,
78 final ElanInstanceCache elanInstanceCache,
79 final HwvtepNodeHACache hwvtepNodeHACache,
80 final L2GatewayServiceRecoveryHandler l2GatewayServiceRecoveryHandler,
81 final ServiceRecoveryRegistry serviceRecoveryRegistry) {
82 super(dataBroker, false);
83 this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
84 this.elanL2GatewayUtils = elanL2GatewayUtils;
85 this.haOpClusteredListener = haOpClusteredListener;
86 this.jobCoordinator = jobCoordinator;
87 this.elanInstanceCache = elanInstanceCache;
88 this.hwvtepNodeHACache = hwvtepNodeHACache;
89 serviceRecoveryRegistry.addRecoverableListener(l2GatewayServiceRecoveryHandler.buildServiceRegistryKey(), this);
94 public void init() throws Exception {
95 ResourceBatchingManager.getInstance().registerDefaultBatchHandlers(this.dataBroker);
101 @SuppressWarnings("all")
102 public void registerListener() {
104 LOG.info("Registering LocalUcastMacListener");
105 registerListener(LogicalDatastoreType.OPERATIONAL, getParentWildCardPath());
106 } catch (Exception e) {
107 LOG.error("Local Ucast Mac register listener error");
111 public void deregisterListener() {
112 LOG.info("Deregistering LocalUcastMacListener");
117 protected boolean proceed(final InstanceIdentifier<Node> parent) {
118 return isNotHAChild(parent);
121 protected String getElanName(final LocalUcastMacs mac) {
122 return mac.getLogicalSwitchRef().getValue().firstKeyOf(LogicalSwitches.class).getHwvtepNodeName().getValue();
126 protected String getGroup(final LocalUcastMacs localUcastMacs) {
127 return getElanName(localUcastMacs);
131 protected void onUpdate(final Map<String, Map<InstanceIdentifier, LocalUcastMacs>> updatedMacsGrouped,
132 final Map<String, Map<InstanceIdentifier, LocalUcastMacs>> deletedMacsGrouped) {
133 updatedMacsGrouped.forEach((key, value) -> value.forEach(this::added));
134 deletedMacsGrouped.forEach((key, value) -> value.forEach(this::removed));
137 public void removed(final InstanceIdentifier<LocalUcastMacs> identifier, final LocalUcastMacs macRemoved) {
138 String hwvtepNodeId = identifier.firstKeyOf(Node.class).getNodeId().getValue();
139 MacAddress macAddress = IetfYangUtil.INSTANCE.canonizeMacAddress(macRemoved.getMacEntryKey());
141 LOG.trace("LocalUcastMacs {} removed from {}", macAddress.getValue(), hwvtepNodeId);
143 ResourceBatchingManager.getInstance().delete(ResourceBatchingManager.ShardResource.CONFIG_TOPOLOGY,
146 String elanName = getElanName(macRemoved);
148 jobCoordinator.enqueueJob(elanName + HwvtepHAUtil.L2GW_JOB_KEY ,
150 L2GatewayDevice elanL2GwDevice = ElanL2GwCacheUtils.getL2GatewayDeviceFromCache(elanName,
152 if (elanL2GwDevice == null) {
153 LOG.warn("Could not find L2GatewayDevice for ELAN: {}, nodeID:{} from cache",
154 elanName, hwvtepNodeId);
158 elanL2GwDevice.removeUcastLocalMac(macRemoved);
159 ElanInstance elanInstance = elanInstanceCache.get(elanName).orElse(null);
160 elanL2GatewayUtils.unInstallL2GwUcastMacFromL2gwDevices(elanName, elanL2GwDevice,
161 Collections.singletonList(macAddress));
162 elanL2GatewayUtils.unInstallL2GwUcastMacFromElanDpns(elanInstance, elanL2GwDevice,
163 Collections.singletonList(macAddress));
168 public void added(final InstanceIdentifier<LocalUcastMacs> identifier, final LocalUcastMacs macAdded) {
169 ResourceBatchingManager.getInstance().put(ResourceBatchingManager.ShardResource.CONFIG_TOPOLOGY,
170 identifier, macAdded);
172 String hwvtepNodeId = identifier.firstKeyOf(Node.class).getNodeId().getValue();
173 String macAddress = IetfYangUtil.INSTANCE.canonizeMacAddress(macAdded.getMacEntryKey()).getValue();
174 String elanName = getElanName(macAdded);
176 LOG.trace("LocalUcastMacs {} added to {}", macAddress, hwvtepNodeId);
178 ElanInstance elan = elanInstanceCache.get(elanName).orElse(null);
180 LOG.warn("Could not find ELAN for mac {} being added", macAddress);
183 jobCoordinator.enqueueJob(elanName + HwvtepHAUtil.L2GW_JOB_KEY,
185 L2GatewayDevice elanL2GwDevice =
186 ElanL2GwCacheUtils.getL2GatewayDeviceFromCache(elanName, hwvtepNodeId);
187 if (elanL2GwDevice == null) {
188 LOG.warn("Could not find L2GatewayDevice for ELAN: {}, nodeID:{} from cache",
189 elanName, hwvtepNodeId);
193 elanL2GwDevice.addUcastLocalMac(macAdded);
194 elanL2GatewayUtils.installL2GwUcastMacInElan(elan, elanL2GwDevice, macAddress, macAdded, null);
200 protected Map<InstanceIdentifier<LocalUcastMacs>, DataObjectModification<LocalUcastMacs>> getChildMod(
201 final InstanceIdentifier<Node> parentIid,
202 final DataObjectModification<Node> mod) {
204 Map<InstanceIdentifier<LocalUcastMacs>, DataObjectModification<LocalUcastMacs>> result = new HashMap<>();
205 DataObjectModification<HwvtepGlobalAugmentation> aug = mod.getModifiedAugmentation(
206 HwvtepGlobalAugmentation.class);
207 if (aug != null && getModificationType(aug) != null) {
208 aug.getModifiedChildren().stream()
209 .filter(childMod -> getModificationType(childMod) != null)
210 .filter(childMod -> childMod.getDataType() == LocalUcastMacs.class)
211 .forEach(childMod -> {
212 LocalUcastMacs afterMac = (LocalUcastMacs) childMod.getDataAfter();
213 LocalUcastMacs mac = afterMac != null ? afterMac : (LocalUcastMacs)childMod.getDataBefore();
214 InstanceIdentifier<LocalUcastMacs> iid = parentIid
215 .augmentation(HwvtepGlobalAugmentation.class)
216 .child(LocalUcastMacs.class, mac.key());
217 result.put(iid, (DataObjectModification<LocalUcastMacs>) childMod);
224 protected void onParentAdded(final DataTreeModification<Node> modification) {
225 InstanceIdentifier<Node> nodeIid = modification.getRootPath().getRootIdentifier();
226 if (IS_PS_NODE_IID.test(nodeIid)) {
229 // TODO skitt we're only using read transactions here
230 LoggingFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(OPERATIONAL,
231 tx -> haOpClusteredListener.onGlobalNodeAdd(nodeIid, modification.getRootNode().getDataAfter(), tx)), LOG,
232 "Error processing added parent");
233 if (!isHAChild(nodeIid)) {
234 LoggingFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, tx -> {
235 LOG.trace("On parent add {}", nodeIid);
236 Node operNode = modification.getRootNode().getDataAfter();
237 Set<LocalUcastMacs> configMacs = getMacs(tx.read(nodeIid).get().orElse(null));
238 Set<LocalUcastMacs> operMacs = getMacs(operNode);
239 Set<LocalUcastMacs> staleMacs = Sets.difference(configMacs, operMacs);
240 staleMacs.forEach(staleMac -> removed(getMacIid(nodeIid, staleMac), staleMac));
241 }), LOG, "Error processing added parent");
245 InstanceIdentifier<LocalUcastMacs> getMacIid(InstanceIdentifier<Node> nodeIid, LocalUcastMacs mac) {
246 return nodeIid.augmentation(HwvtepGlobalAugmentation.class)
247 .child(LocalUcastMacs.class, mac.key());
250 private static Set<LocalUcastMacs> getMacs(@Nullable Node node) {
252 HwvtepGlobalAugmentation augmentation = node.augmentation(HwvtepGlobalAugmentation.class);
253 if (augmentation != null && augmentation.nonnullLocalUcastMacs() != null) {
254 return new HashSet<>(augmentation.nonnullLocalUcastMacs().values());
257 return Collections.emptySet();
261 protected void onParentRemoved(InstanceIdentifier<Node> parent) {
262 if (IS_PS_NODE_IID.test(parent)) {
265 LOG.trace("on parent removed {}", parent);
269 protected InstanceIdentifier<Node> getParentWildCardPath() {
270 return HwvtepSouthboundUtils.createHwvtepTopologyInstanceIdentifier()
274 private boolean isNotHAChild(InstanceIdentifier<Node> nodeId) {
275 return !hwvtepNodeHACache.isHAEnabledDevice(nodeId)
276 && !nodeId.firstKeyOf(Node.class).getNodeId().getValue().contains(HwvtepHAUtil.PHYSICALSWITCH);
279 private boolean isHAChild(InstanceIdentifier<Node> nodeId) {
280 return hwvtepNodeHACache.isHAEnabledDevice(nodeId);