package org.opendaylight.netvirt.elan.l2gw.ha.listeners;
import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import javax.annotation.PreDestroy;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.genius.datastoreutils.TaskRetryLooper;
import org.opendaylight.genius.utils.hwvtep.HwvtepNodeHACache;
import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundConstants;
import org.opendaylight.infrautils.metrics.Meter;
import org.opendaylight.infrautils.metrics.MetricDescriptor;
import org.opendaylight.infrautils.metrics.MetricProvider;
-import org.opendaylight.netvirt.elan.l2gw.ha.BatchedTransaction;
+import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.DataObjectModification;
+import org.opendaylight.mdsal.binding.api.DataTreeChangeListener;
+import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
+import org.opendaylight.mdsal.binding.api.DataTreeModification;
+import org.opendaylight.mdsal.binding.util.Datastore;
+import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunner;
+import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunnerImpl;
+import org.opendaylight.mdsal.binding.util.TypedReadWriteTransaction;
import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public abstract class HwvtepNodeBaseListener implements DataTreeChangeListener<Node>, AutoCloseable {
+public abstract class HwvtepNodeBaseListener<D extends Datastore>
+ implements DataTreeChangeListener<Node>, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(HwvtepNodeBaseListener.class);
private static final int STARTUP_LOOP_TICK = 500;
private static final int STARTUP_LOOP_MAX_RETRIES = 8;
- private final ListenerRegistration<HwvtepNodeBaseListener> registration;
+ private ListenerRegistration<HwvtepNodeBaseListener> registration;
private final DataBroker dataBroker;
+ final ManagedNewTransactionRunner txRunner;
private final HwvtepNodeHACache hwvtepNodeHACache;
- private final MetricProvider metricProvider;
- private final LogicalDatastoreType datastoreType;
+ private final Class<D> datastoreType;
private final Function<DataObject, String> noLogicalSwitch = (data) -> "No_Ls";
private final Labeled<Labeled<Labeled<Labeled<Labeled<Meter>>>>> childModCounter;
private final Labeled<Labeled<Labeled<Meter>>> nodeModCounter;
private final boolean updateMetrics;
- ImmutableMap<Class, Function<DataObject, String>> logicalSwitchExtractor =
- new ImmutableMap.Builder<Class, Function<DataObject, String>>()
- .put(LogicalSwitches.class, data -> ((LogicalSwitches) data).getHwvtepNodeName().getValue())
- .put(RemoteMcastMacs.class, data -> {
- return logicalSwitchNameFromIid(((RemoteMcastMacs) data).key().getLogicalSwitchRef()
- .getValue());
- })
- .put(RemoteUcastMacs.class, data -> {
- return logicalSwitchNameFromIid(((RemoteUcastMacs) data).key().getLogicalSwitchRef()
- .getValue());
- }).build();
+ private static final ImmutableMap<Class, Function<DataObject, String>> LOGICAL_SWITCH_EXTRACTOR =
+ new ImmutableMap.Builder<Class, Function<DataObject, String>>()
+ .put(LogicalSwitches.class, data -> ((LogicalSwitches) data).getHwvtepNodeName().getValue())
+ .put(RemoteMcastMacs.class,
+ data -> logicalSwitchNameFromIid(((RemoteMcastMacs) data).key().getLogicalSwitchRef().getValue()))
+ .put(RemoteUcastMacs.class, data -> logicalSwitchNameFromIid(
+ ((RemoteUcastMacs) data).key().getLogicalSwitchRef().getValue())).build();
- public HwvtepNodeBaseListener(LogicalDatastoreType datastoreType, DataBroker dataBroker,
+ public HwvtepNodeBaseListener(Class<D> datastoreType, DataBroker dataBroker,
HwvtepNodeHACache hwvtepNodeHACache, MetricProvider metricProvider,
boolean updateMetrics) throws Exception {
this.dataBroker = dataBroker;
+ this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
this.datastoreType = datastoreType;
this.hwvtepNodeHACache = hwvtepNodeHACache;
- this.metricProvider = metricProvider;
this.updateMetrics = updateMetrics;
this.childModCounter = metricProvider.newMeter(
MetricDescriptor.builder().anchor(this).project("netvirt").module("l2gw").id("child").build(),
this.nodeModCounter = metricProvider.newMeter(
MetricDescriptor.builder().anchor(this).project("netvirt").module("l2gw").id("node").build(),
"datastore", "modification", "nodeid");
- final DataTreeIdentifier<Node> treeId = new DataTreeIdentifier<>(datastoreType, getWildcardPath());
+ registerListener(datastoreType, dataBroker);
+ }
+
+ protected void registerListener(Class<D> dsType, DataBroker broker) throws Exception {
+ final DataTreeIdentifier<Node> treeId = DataTreeIdentifier.create(Datastore.toType(dsType),
+ getWildcardPath());
TaskRetryLooper looper = new TaskRetryLooper(STARTUP_LOOP_TICK, STARTUP_LOOP_MAX_RETRIES);
registration = looper.loopUntilNoException(() ->
- dataBroker.registerDataTreeChangeListener(treeId, HwvtepNodeBaseListener.this));
+ broker.registerDataTreeChangeListener(treeId, HwvtepNodeBaseListener.this));
}
protected DataBroker getDataBroker() {
List<Managers> up = null;
List<Managers> be = null;
if (updatedAugmentaion != null) {
- up = updatedAugmentaion.getManagers();
+ up = new ArrayList<Managers>(updatedAugmentaion.nonnullManagers().values());
}
if (beforeAugmentaion != null) {
- be = beforeAugmentaion.getManagers();
+ be = new ArrayList<Managers>(beforeAugmentaion.nonnullManagers().values());
}
- if (up != null && be != null && up.size() > 0 && be.size() > 0) {
+ if (up != null) {
Managers m1 = up.get(0);
Managers m2 = be.get(0);
- if (!m1.equals(m2)) {
+ if (!Objects.equals(m1, m2)) {
LOG.trace("Manager entry updated for node {} ", updatedChildNode.getNodeId().getValue());
HwvtepHAUtil.addToCacheIfHAChildNode(childPath, updatedChildNode, hwvtepNodeHACache);
}
@Override
public void onDataTreeChanged(final Collection<DataTreeModification<Node>> changes) {
- HAJobScheduler.getInstance().submitJob(() -> {
- ReadWriteTransaction tx = getTx();
- try {
+ HAJobScheduler.getInstance().submitJob(() -> LoggingFutures.addErrorLogging(
+ txRunner.callWithNewReadWriteTransactionAndSubmit(datastoreType, tx -> {
processConnectedNodes(changes, tx);
processUpdatedNodes(changes, tx);
processDisconnectedNodes(changes, tx);
- tx.submit().get();
- } catch (InterruptedException | ExecutionException | ReadFailedException e) {
- LOG.error("Error processing data-tree changes", e);
- }
- });
+ }), LOG, "Error processing data-tree changes"));
}
private void processUpdatedNodes(Collection<DataTreeModification<Node>> changes,
- ReadWriteTransaction tx) {
+ TypedReadWriteTransaction<D> tx)
+ throws ExecutionException, InterruptedException {
for (DataTreeModification<Node> change : changes) {
final InstanceIdentifier<Node> key = change.getRootPath().getRootIdentifier();
final DataObjectModification<Node> mod = change.getRootNode();
onGlobalNodeUpdate(key, updated, original, mod, tx);
subMod = change.getRootNode().getModifiedAugmentation(HwvtepGlobalAugmentation.class);
} else {
- onPsNodeUpdate(updated, original, mod, tx);
+ onPsNodeUpdate(updated, mod, tx);
subMod = change.getRootNode().getModifiedAugmentation(PhysicalSwitchAugmentation.class);
}
if (subMod != null) {
}
private String logicalSwitchNameFromChildMod(DataObjectModification<? extends DataObject> childMod) {
- DataObject data = childMod.getDataAfter() != null ? childMod.getDataAfter() : childMod.getDataBefore();
- return logicalSwitchExtractor.getOrDefault(childMod.getModificationType().getClass(), noLogicalSwitch)
+ DataObject dataAfter = childMod.getDataAfter();
+ DataObject data = dataAfter != null ? dataAfter : childMod.getDataBefore();
+ return LOGICAL_SWITCH_EXTRACTOR.getOrDefault(childMod.getModificationType().getClass(), noLogicalSwitch)
.apply(data);
}
String childClsName = childMod.getDataType().getClass().getSimpleName();
String modificationType = childMod.getModificationType().toString();
String logicalSwitchName = logicalSwitchNameFromChildMod(childMod);
- childModCounter.label(datastoreType.name())
+ childModCounter.label(Datastore.toType(datastoreType).name())
.label(modificationType)
.label(childClsName)
.label(nodeId)
}
private void processDisconnectedNodes(Collection<DataTreeModification<Node>> changes,
- ReadWriteTransaction tx)
- throws ReadFailedException {
+ TypedReadWriteTransaction<D> tx)
+ throws InterruptedException, ExecutionException {
for (DataTreeModification<Node> change : changes) {
final InstanceIdentifier<Node> key = change.getRootPath().getRootIdentifier();
final DataObjectModification<Node> mod = change.getRootNode();
String nodeId = key.firstKeyOf(Node.class).getNodeId().getValue();
if (deleted != null) {
if (updateMetrics) {
- nodeModCounter.label(datastoreType.name())
+ nodeModCounter.label(Datastore.toType(datastoreType).name())
.label(DataObjectModification.ModificationType.DELETE.name()).label(nodeId).mark();
}
if (!nodeId.contains(HwvtepHAUtil.PHYSICALSWITCH)) {
}
void processConnectedNodes(Collection<DataTreeModification<Node>> changes,
- ReadWriteTransaction tx)
- throws ReadFailedException {
+ TypedReadWriteTransaction<D> tx)
+ throws ExecutionException, InterruptedException {
for (DataTreeModification<Node> change : changes) {
InstanceIdentifier<Node> key = change.getRootPath().getRootIdentifier();
DataObjectModification<Node> mod = change.getRootNode();
String nodeId = key.firstKeyOf(Node.class).getNodeId().getValue();
if (node != null) {
if (updateMetrics) {
- nodeModCounter.label(datastoreType.name())
+ nodeModCounter.label(Datastore.toType(datastoreType).name())
.label(DataObjectModification.ModificationType.WRITE.name()).label(nodeId).mark();
}
if (!nodeId.contains(HwvtepHAUtil.PHYSICALSWITCH)) {
}
}
- private InstanceIdentifier<Node> getWildcardPath() {
+ private static InstanceIdentifier<Node> getWildcardPath() {
InstanceIdentifier<Node> path = InstanceIdentifier
.create(NetworkTopology.class)
.child(Topology.class, new TopologyKey(HwvtepSouthboundConstants.HWVTEP_TOPOLOGY_ID))
}
}
- ReadWriteTransaction getTx() {
- return new BatchedTransaction();
- }
-
//default methods
- void onGlobalNodeDelete(InstanceIdentifier<Node> key, Node added, ReadWriteTransaction tx)
- throws ReadFailedException {
+ void onGlobalNodeDelete(InstanceIdentifier<Node> key, Node added, TypedReadWriteTransaction<D> tx)
+ throws ExecutionException, InterruptedException {
}
- void onPsNodeDelete(InstanceIdentifier<Node> key, Node addedPSNode, ReadWriteTransaction tx)
- throws ReadFailedException {
+ void onPsNodeDelete(InstanceIdentifier<Node> key, Node addedPSNode, TypedReadWriteTransaction<D> tx)
+ throws ExecutionException, InterruptedException {
}
- void onGlobalNodeAdd(InstanceIdentifier<Node> key, Node added, ReadWriteTransaction tx) {
+ void onGlobalNodeAdd(InstanceIdentifier<Node> key, Node added, TypedReadWriteTransaction<D> tx) {
}
- void onPsNodeAdd(InstanceIdentifier<Node> key, Node addedPSNode, ReadWriteTransaction tx)
- throws ReadFailedException {
+ void onPsNodeAdd(InstanceIdentifier<Node> key, Node addedPSNode, TypedReadWriteTransaction<D> tx)
+ throws InterruptedException, ExecutionException {
}
void onGlobalNodeUpdate(InstanceIdentifier<Node> key, Node updated, Node original,
- DataObjectModification<Node> mod, ReadWriteTransaction tx) {
+ DataObjectModification<Node> mod, TypedReadWriteTransaction<D> tx) {
}
- void onPsNodeUpdate(Node updated, Node original,
- DataObjectModification<Node> mod, ReadWriteTransaction tx) {
+ void onPsNodeUpdate(Node updated,
+ DataObjectModification<Node> mod, TypedReadWriteTransaction<D> tx) {
}