--- /dev/null
+/*
+ * Copyright (c) 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netvirt.elan.l2gw.ha.listeners;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundConstants;
+import org.opendaylight.netvirt.elan.l2gw.ha.commands.LocalMcastCmd;
+import org.opendaylight.netvirt.elan.l2gw.ha.commands.LocalUcastCmd;
+import org.opendaylight.netvirt.elan.l2gw.ha.commands.LogicalSwitchesCmd;
+import org.opendaylight.netvirt.elan.l2gw.ha.commands.MergeCommand;
+import org.opendaylight.netvirt.elan.l2gw.ha.commands.PhysicalLocatorCmd;
+import org.opendaylight.netvirt.elan.l2gw.ha.commands.RemoteMcastCmd;
+import org.opendaylight.netvirt.elan.l2gw.ha.commands.RemoteUcastCmd;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAttributes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.LocalMcastMacs;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.LocalUcastMacs;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.LogicalSwitches;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.RemoteMcastMacs;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.RemoteUcastMacs;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint;
+import org.opendaylight.yangtools.yang.binding.ChildOf;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.Identifiable;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HAListeners implements AutoCloseable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HAListeners.class);
+
+ private final DataBroker broker;
+ private final List<HwvtepNodeDataListener> listeners = new ArrayList<>();
+ private final InstanceIdentifier physicalPortIid = InstanceIdentifier.create(NetworkTopology.class)
+ .child(Topology.class, new TopologyKey(HwvtepSouthboundConstants.HWVTEP_TOPOLOGY_ID))
+ .child(Node.class).child(TerminationPoint.class);
+
+ public HAListeners(DataBroker broker) {
+ this.broker = broker;
+ registerListener(LocalMcastMacs.class, new LocalMcastCmd());
+ registerListener(RemoteMcastMacs.class, new RemoteMcastCmd());
+ registerListener(LocalUcastMacs.class, new LocalUcastCmd());
+ registerListener(RemoteUcastMacs.class, new RemoteUcastCmd());
+ registerListener(LogicalSwitches.class, new LogicalSwitchesCmd());
+
+ PhysicalLocatorCmd physicalLocatorCmd = new PhysicalLocatorCmd();
+ listeners.add(new PhysicalLocatorListener(broker, PhysicalLocatorListener.class, physicalLocatorCmd,
+ LogicalDatastoreType.CONFIGURATION));
+ listeners.add(new PhysicalLocatorListener(broker, PhysicalLocatorListener.class, physicalLocatorCmd,
+ LogicalDatastoreType.OPERATIONAL));
+ }
+
+ public void init() {
+ }
+
+ @Override
+ public void close() throws Exception {
+ for (HwvtepNodeDataListener listener : listeners) {
+ listener.close();
+ }
+ }
+
+ private <T extends ChildOf<HwvtepGlobalAttributes>> void registerListener(Class<T> clazz,
+ MergeCommand mergeCommand) {
+ listeners.add(new GlobalAugmentationListener(broker, clazz, HwvtepNodeDataListener.class, mergeCommand,
+ LogicalDatastoreType.CONFIGURATION));
+ listeners.add(new GlobalAugmentationListener(broker, clazz, HwvtepNodeDataListener.class, mergeCommand,
+ LogicalDatastoreType.OPERATIONAL));
+ }
+
+ class GlobalAugmentationListener<T extends DataObject & ChildOf<HwvtepGlobalAttributes> & Identifiable>
+ extends HwvtepNodeDataListener<T> {
+
+ GlobalAugmentationListener(DataBroker broker, Class<T> clazz, Class eventClazz,
+ MergeCommand mergeCommand, LogicalDatastoreType datastoreType) {
+ super(broker, clazz, eventClazz, mergeCommand, datastoreType);
+ }
+
+ protected InstanceIdentifier<T> getWildCardPath() {
+ return InstanceIdentifier.create(NetworkTopology.class)
+ .child(Topology.class, new TopologyKey(HwvtepSouthboundConstants.HWVTEP_TOPOLOGY_ID))
+ .child(Node.class).augmentation(HwvtepGlobalAugmentation.class).child(clazz);
+ }
+ }
+
+ class PhysicalLocatorListener extends HwvtepNodeDataListener<TerminationPoint> {
+
+ PhysicalLocatorListener(DataBroker broker, Class eventClazz,
+ MergeCommand mergeCommand, LogicalDatastoreType datastoreType) {
+ super(broker, TerminationPoint.class, eventClazz, mergeCommand, datastoreType);
+ }
+
+ @Override
+ protected InstanceIdentifier getWildCardPath() {
+ return physicalPortIid;
+ }
+
+ @Override
+ protected void remove(InstanceIdentifier<TerminationPoint> identifier, TerminationPoint dataRemoved) {
+ if (!isGlobalNode(identifier)) {
+ return;
+ }
+ super.remove(identifier, dataRemoved);
+ }
+
+ @Override
+ protected void update(InstanceIdentifier<TerminationPoint> identifier, TerminationPoint before,
+ TerminationPoint after) {
+ if (!isGlobalNode(identifier)) {
+ return;
+ }
+ super.update(identifier, before, after);
+ }
+
+ @Override
+ protected void add(InstanceIdentifier<TerminationPoint> identifier, TerminationPoint added) {
+ if (!isGlobalNode(identifier)) {
+ return;
+ }
+ super.remove(identifier, added);
+ }
+
+ boolean isGlobalNode(InstanceIdentifier<TerminationPoint> identifier) {
+ return !identifier.firstKeyOf(Node.class).getNodeId().getValue()
+ .contains(HwvtepSouthboundConstants.PSWITCH_URI_PREFIX);
+ }
+ }
+}
/*
- * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+ * Copyright (c) 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.genius.datastoreutils.AsyncDataTreeChangeListenerBase;
import org.opendaylight.genius.utils.hwvtep.HwvtepHACache;
+import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundConstants;
import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
import org.opendaylight.netvirt.elan.l2gw.ha.commands.SwitchesCmd;
import org.opendaylight.netvirt.elan.l2gw.ha.handlers.HAEventHandler;
import org.opendaylight.netvirt.elan.l2gw.ha.handlers.IHAEventHandler;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.Managers;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.Switches;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.managers.ManagerOtherConfigs;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.slf4j.Logger;
static HwvtepHACache hwvtepHACache = HwvtepHACache.getInstance();
- IHAEventHandler haEventHandler;
+ private final IHAEventHandler haEventHandler;
- Map<String, Boolean> availableGlobalNodes = new HashMap<>();
- Map<String, Boolean> availablePsNodes = new HashMap<>();
+ private final Map<String, Boolean> availableGlobalNodes = new HashMap<>();
+ private final Map<String, Boolean> availablePsNodes = new HashMap<>();
+ private ManagerListener managerListener;
void clearNodeAvailability(InstanceIdentifier<Node> key) {
String id = key.firstKeyOf(Node.class).getNodeId().getValue();
public HAOpNodeListener(DataBroker db, HAEventHandler haEventHandler) throws Exception {
super(OPERATIONAL, db);
this.haEventHandler = haEventHandler;
+ this.managerListener = new ManagerListener(Managers.class, ManagerListener.class);
LOG.info("Registering HwvtepDataChangeListener for operational nodes");
}
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (managerListener != null) {
+ managerListener.close();
+ }
+ }
+
@Override
void onGlobalNodeAdd(InstanceIdentifier<Node> childPath,
Node childNode,
}
}
+ //Update on global node has been taken care by HAListeners as per perf improvement
@Override
void onGlobalNodeUpdate(InstanceIdentifier<Node> childPath,
Node updatedChildNode,
Node originalChildNode,
ReadWriteTransaction tx) throws ReadFailedException {
- String oldHAId = HwvtepHAUtil.getHAIdFromManagerOtherConfig(originalChildNode);
+ /* String oldHAId = HwvtepHAUtil.getHAIdFromManagerOtherConfig(originalChildNode);
if (!Strings.isNullOrEmpty(oldHAId)) { //was already ha child
InstanceIdentifier<Node> haPath = hwvtepHACache.getParent(childPath);
haEventHandler.copyChildGlobalOpUpdateToHAParent(updatedChildNode, originalChildNode, haPath, tx);
hwvtepHACache.updateConnectedNodeStatus(childPath);
LOG.info("{} became ha child ", updatedChildNode.getNodeId().getValue());
onGlobalNodeAdd(childPath, updatedChildNode, tx);
- }
+ }*/
}
@Override
}
});
}
+
+ /**
+ * ManagerListeners listens to manager updated and act in case non-ha node get converted to ha node.
+ */
+ class ManagerListener extends AsyncDataTreeChangeListenerBase<Managers, ManagerListener> {
+
+ ManagerListener(Class<Managers> clazz, Class<ManagerListener> eventClazz) {
+ super(clazz, eventClazz);
+ registerListener(LogicalDatastoreType.OPERATIONAL, db);
+ }
+
+ @Override
+ protected InstanceIdentifier<Managers> getWildCardPath() {
+ return InstanceIdentifier.create(NetworkTopology.class)
+ .child(Topology.class, new TopologyKey(HwvtepSouthboundConstants.HWVTEP_TOPOLOGY_ID)).child(Node
+ .class).augmentation(HwvtepGlobalAugmentation.class).child(clazz);
+ }
+
+ @Override
+ protected void remove(InstanceIdentifier<Managers> instanceIdentifier, Managers managers) {
+
+ }
+
+ String getHaId(Managers managers) {
+ if (managers.getManagerOtherConfigs() == null) {
+ return null;
+ }
+ for (ManagerOtherConfigs configs : managers.getManagerOtherConfigs()) {
+ if (configs.getOtherConfigKey().equals(HwvtepHAUtil.HA_ID)) {
+ return configs.getOtherConfigValue();
+ }
+ }
+ return null;
+ }
+
+ @Override
+ protected void update(InstanceIdentifier<Managers> instanceIdentifier, Managers oldData, Managers newData) {
+ String oldHAId = getHaId(oldData);
+ if (Strings.isNullOrEmpty(oldHAId)) {
+ String newHAID = getHaId(newData);
+ if (!Strings.isNullOrEmpty(newHAID)) {
+ InstanceIdentifier<Node> nodeIid = instanceIdentifier.firstIdentifierOf(Node.class);
+ ReadWriteTransaction tx = db.newReadWriteTransaction();
+ try {
+ Node node = tx.read(LogicalDatastoreType.OPERATIONAL, nodeIid).checkedGet().get();
+ HAOpClusteredListener.addToCacheIfHAChildNode(nodeIid, node);
+ HAJobScheduler.getInstance().submitJob( () -> {
+ onGlobalNodeAdd(nodeIid, node, tx);
+ });
+ } catch (ReadFailedException e) {
+ LOG.error("Read failed {}",e.getMessage());
+ }
+ }
+ }
+ }
+
+ //The add manager will be called once a new node connects which is handled by the base
+ //HA node listener . In case the node been converted from non-ha to ha or vice versa
+ //it will come as an update and has been handled above.
+ //Hence no functionality has been added to add function here.
+ @Override
+ protected void add(InstanceIdentifier<Managers> instanceIdentifier, Managers managers) {
+ }
+
+ @Override
+ protected ManagerListener getDataTreeChangeListener() {
+ return ManagerListener.this;
+ }
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netvirt.elan.l2gw.ha.listeners;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.genius.datastoreutils.AsyncDataTreeChangeListenerBase;
+import org.opendaylight.genius.utils.hwvtep.HwvtepHACache;
+import org.opendaylight.netvirt.elan.l2gw.ha.commands.MergeCommand;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Listens for the node children data updates and propagates the updates between ha child and parent nodes.
+ * When an operational child node data is updated, it is copied to parent
+ * When a config parent node data is updated , it is copied to all its children.
+ */
+public abstract class HwvtepNodeDataListener<T extends DataObject>
+ extends AsyncDataTreeChangeListenerBase<T, HwvtepNodeDataListener<T>> implements AutoCloseable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HwvtepNodeDataListener.class);
+
+ private final DataBroker broker;
+ private final MergeCommand mergeCommand;
+ private final LogicalDatastoreType datastoreType;
+
+ public HwvtepNodeDataListener(DataBroker broker,
+ Class<T> clazz,
+ Class<HwvtepNodeDataListener<T>> eventClazz,
+ MergeCommand mergeCommand,
+ LogicalDatastoreType datastoreType) {
+ super(clazz, eventClazz);
+ this.broker = broker;
+ this.mergeCommand = mergeCommand;
+ this.datastoreType = datastoreType;
+ registerListener(this.datastoreType, broker);
+ }
+
+ protected abstract InstanceIdentifier<T> getWildCardPath();
+
+ @Override
+ protected void add(InstanceIdentifier<T> identifier, T dataAdded) {
+ HAJobScheduler.getInstance().submitJob( () -> {
+ try {
+ boolean create = true;
+ ReadWriteTransaction tx = broker.newReadWriteTransaction();
+ if (LogicalDatastoreType.OPERATIONAL == datastoreType) {
+ copyToParent(identifier, dataAdded, create, tx);
+ } else {
+ copyToChild(identifier, dataAdded, create, tx);
+ }
+ CheckedFuture<Void, TransactionCommitFailedException> futures = tx.submit();
+ futures.get();
+ } catch (ReadFailedException e) {
+ LOG.error("Exception caught while writing ", e.getMessage());
+ } catch (InterruptedException e) {
+ LOG.error("Exception caught while writing ", e.getMessage());
+ } catch (ExecutionException e) {
+ LOG.error("Exception caught while writing ", e.getMessage());
+ }
+ });
+ }
+
+ @Override
+ protected void update(InstanceIdentifier<T> key, T before, T after) {
+ HAJobScheduler.getInstance().submitJob( () -> {
+ if (Objects.equals(before, after)) {
+ //incase of cluter reboots tx.put will rewrite the data and fire unnecessary updates
+ return;
+ }
+ add(key, after);
+ });
+ }
+
+ @Override
+ protected void remove(InstanceIdentifier<T> identifier, T dataRemoved) {
+ HAJobScheduler.getInstance().submitJob( () -> {
+ try {
+ boolean create = false;
+ ReadWriteTransaction tx = broker.newReadWriteTransaction();
+ if (LogicalDatastoreType.OPERATIONAL == datastoreType) {
+ copyToParent(identifier, dataRemoved, create, tx);
+ } else {
+ copyToChild(identifier, dataRemoved, create, tx);
+ }
+ CheckedFuture<Void, TransactionCommitFailedException> futures = tx.submit();
+ futures.get();
+ } catch (ReadFailedException e) {
+ LOG.error("Exception caught while writing ", e.getMessage());
+ } catch (InterruptedException e) {
+ LOG.error("Exception caught while writing ", e.getMessage());
+ } catch (ExecutionException e) {
+ LOG.error("Exception caught while writing ", e.getMessage());
+ }
+ });
+ }
+
+ <T extends DataObject> boolean isDataUpdated(Optional<T> existingDataOptional, T newData) {
+ return !existingDataOptional.isPresent() || !Objects.equals(existingDataOptional.get(), newData);
+ }
+
+ <T extends DataObject> void copyToParent(InstanceIdentifier<T> identifier, T data, boolean create,
+ ReadWriteTransaction tx) throws ReadFailedException {
+ InstanceIdentifier<Node> parent = getHAParent(identifier);
+ if (parent == null) {
+ return;
+ }
+ LOG.trace("Copy child op data {} to parent {} create:{}", mergeCommand.getDescription(),
+ getNodeId(parent), create);
+ data = (T) mergeCommand.transform(parent, data);
+ identifier = mergeCommand.generateId(parent, data);
+ Optional<T> existingDataOptional = tx.read(datastoreType, identifier).checkedGet();
+ if (create) {
+ if (isDataUpdated(existingDataOptional, data)) {
+ tx.put(datastoreType, identifier, data);
+ }
+ } else {
+ if (existingDataOptional.isPresent()) {
+ tx.delete(datastoreType, identifier);
+ }
+ }
+ }
+
+ <T extends DataObject> void copyToChild(InstanceIdentifier<T> identifier, T data, boolean create,
+ ReadWriteTransaction tx) throws ReadFailedException {
+ Set<InstanceIdentifier<Node>> children = getChildrenForHANode(identifier);
+ if (children == null) {
+ return;
+ }
+ for (InstanceIdentifier<Node> child : children) {
+ LOG.trace("Copy parent config data {} to child {} create:{} ", mergeCommand.getDescription(),
+ getNodeId(child), create);
+ data = (T) mergeCommand.transform(child, data);
+ identifier = mergeCommand.generateId(child, data);
+ if (create) {
+ tx.put(datastoreType, identifier, data);
+ } else {
+ Optional<T> existingDataOptional = tx.read(datastoreType, identifier).checkedGet();
+ if (existingDataOptional.isPresent()) {
+ tx.delete(datastoreType, identifier);
+ }
+ }
+ }
+ }
+
+ private String getNodeId(InstanceIdentifier<Node> iid) {
+ return iid.firstKeyOf(Node.class).getNodeId().getValue();
+ }
+
+ @Override
+ protected HwvtepNodeDataListener<T> getDataTreeChangeListener() {
+ return HwvtepNodeDataListener.this;
+ }
+
+ protected Set<InstanceIdentifier<Node>> getChildrenForHANode(InstanceIdentifier identifier) {
+ InstanceIdentifier<Node> parent = identifier.firstIdentifierOf(Node.class);
+ return HwvtepHACache.getInstance().getChildrenForHANode(parent);
+ }
+
+ protected InstanceIdentifier<Node> getHAParent(InstanceIdentifier identifier) {
+ InstanceIdentifier<Node> child = identifier.firstIdentifierOf(Node.class);
+ return HwvtepHACache.getInstance().getParent(child);
+ }
+}
<argument ref="dataBroker" />
</bean>
+ <bean id="hAListeners" class="org.opendaylight.netvirt.elan.l2gw.ha.listeners.HAListeners">
+ <argument ref="dataBroker" />
+ </bean>
+
<bean id="haConfigNodeListener" class="org.opendaylight.netvirt.elan.l2gw.ha.listeners.HAConfigNodeListener">
<argument ref="dataBroker" />
<argument ref="haEventHandler" />