import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import javax.annotation.PreDestroy;
-
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.genius.datastoreutils.TaskRetryLooper;
+import org.opendaylight.genius.infra.Datastore;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
+import org.opendaylight.genius.infra.TypedReadWriteTransaction;
import org.opendaylight.genius.utils.hwvtep.HwvtepNodeHACache;
import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundConstants;
import org.opendaylight.infrautils.metrics.Labeled;
import org.opendaylight.infrautils.metrics.Meter;
import org.opendaylight.infrautils.metrics.MetricDescriptor;
import org.opendaylight.infrautils.metrics.MetricProvider;
-import org.opendaylight.netvirt.elan.l2gw.ha.BatchedTransaction;
+import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public abstract class HwvtepNodeBaseListener implements DataTreeChangeListener<Node>, AutoCloseable {
+public abstract class HwvtepNodeBaseListener<D extends Datastore>
+ implements DataTreeChangeListener<Node>, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(HwvtepNodeBaseListener.class);
private static final int STARTUP_LOOP_TICK = 500;
private final ListenerRegistration<HwvtepNodeBaseListener> registration;
private final DataBroker dataBroker;
+ final ManagedNewTransactionRunner txRunner;
private final HwvtepNodeHACache hwvtepNodeHACache;
- private final MetricProvider metricProvider;
- private final LogicalDatastoreType datastoreType;
+ private final Class<D> datastoreType;
private final Function<DataObject, String> noLogicalSwitch = (data) -> "No_Ls";
private final Labeled<Labeled<Labeled<Labeled<Labeled<Meter>>>>> childModCounter;
private final Labeled<Labeled<Labeled<Meter>>> nodeModCounter;
private final boolean updateMetrics;
- ImmutableMap<Class, Function<DataObject, String>> logicalSwitchExtractor =
- new ImmutableMap.Builder<Class, Function<DataObject, String>>()
- .put(LogicalSwitches.class, data -> ((LogicalSwitches) data).getHwvtepNodeName().getValue())
- .put(RemoteMcastMacs.class, data -> {
- return logicalSwitchNameFromIid(((RemoteMcastMacs) data).key().getLogicalSwitchRef()
- .getValue());
- })
- .put(RemoteUcastMacs.class, data -> {
- return logicalSwitchNameFromIid(((RemoteUcastMacs) data).key().getLogicalSwitchRef()
- .getValue());
- }).build();
+ private static final ImmutableMap<Class, Function<DataObject, String>> LOGICAL_SWITCH_EXTRACTOR =
+ new ImmutableMap.Builder<Class, Function<DataObject, String>>()
+ .put(LogicalSwitches.class, data -> ((LogicalSwitches) data).getHwvtepNodeName().getValue())
+ .put(RemoteMcastMacs.class,
+ data -> logicalSwitchNameFromIid(((RemoteMcastMacs) data).key().getLogicalSwitchRef().getValue()))
+ .put(RemoteUcastMacs.class, data -> logicalSwitchNameFromIid(
+ ((RemoteUcastMacs) data).key().getLogicalSwitchRef().getValue())).build();
- public HwvtepNodeBaseListener(LogicalDatastoreType datastoreType, DataBroker dataBroker,
+ public HwvtepNodeBaseListener(Class<D> datastoreType, DataBroker dataBroker,
HwvtepNodeHACache hwvtepNodeHACache, MetricProvider metricProvider,
boolean updateMetrics) throws Exception {
this.dataBroker = dataBroker;
+ this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
this.datastoreType = datastoreType;
this.hwvtepNodeHACache = hwvtepNodeHACache;
- this.metricProvider = metricProvider;
this.updateMetrics = updateMetrics;
this.childModCounter = metricProvider.newMeter(
MetricDescriptor.builder().anchor(this).project("netvirt").module("l2gw").id("child").build(),
this.nodeModCounter = metricProvider.newMeter(
MetricDescriptor.builder().anchor(this).project("netvirt").module("l2gw").id("node").build(),
"datastore", "modification", "nodeid");
- final DataTreeIdentifier<Node> treeId = new DataTreeIdentifier<>(datastoreType, getWildcardPath());
+ final DataTreeIdentifier<Node> treeId =
+ new DataTreeIdentifier<>(Datastore.toType(datastoreType), getWildcardPath());
TaskRetryLooper looper = new TaskRetryLooper(STARTUP_LOOP_TICK, STARTUP_LOOP_MAX_RETRIES);
registration = looper.loopUntilNoException(() ->
dataBroker.registerDataTreeChangeListener(treeId, HwvtepNodeBaseListener.this));
@Override
public void onDataTreeChanged(final Collection<DataTreeModification<Node>> changes) {
- HAJobScheduler.getInstance().submitJob(() -> {
- ReadWriteTransaction tx = getTx();
- try {
+ HAJobScheduler.getInstance().submitJob(() -> LoggingFutures.addErrorLogging(
+ txRunner.callWithNewReadWriteTransactionAndSubmit(datastoreType, tx -> {
processConnectedNodes(changes, tx);
processUpdatedNodes(changes, tx);
processDisconnectedNodes(changes, tx);
- tx.submit().get();
- } catch (InterruptedException | ExecutionException | ReadFailedException e) {
- LOG.error("Error processing data-tree changes", e);
- }
- });
+ }), LOG, "Error processing data-tree changes"));
}
private void processUpdatedNodes(Collection<DataTreeModification<Node>> changes,
- ReadWriteTransaction tx)
+ TypedReadWriteTransaction<D> tx)
throws ReadFailedException, ExecutionException, InterruptedException {
for (DataTreeModification<Node> change : changes) {
final InstanceIdentifier<Node> key = change.getRootPath().getRootIdentifier();
onGlobalNodeUpdate(key, updated, original, mod, tx);
subMod = change.getRootNode().getModifiedAugmentation(HwvtepGlobalAugmentation.class);
} else {
- onPsNodeUpdate(updated, original, mod, tx);
+ onPsNodeUpdate(updated, mod, tx);
subMod = change.getRootNode().getModifiedAugmentation(PhysicalSwitchAugmentation.class);
}
if (subMod != null) {
}
private String logicalSwitchNameFromChildMod(DataObjectModification<? extends DataObject> childMod) {
- DataObject data = childMod.getDataAfter() != null ? childMod.getDataAfter() : childMod.getDataBefore();
- return logicalSwitchExtractor.getOrDefault(childMod.getModificationType().getClass(), noLogicalSwitch)
+ DataObject dataAfter = childMod.getDataAfter();
+ DataObject data = dataAfter != null ? dataAfter : childMod.getDataBefore();
+ return LOGICAL_SWITCH_EXTRACTOR.getOrDefault(childMod.getModificationType().getClass(), noLogicalSwitch)
.apply(data);
}
- private String logicalSwitchNameFromIid(InstanceIdentifier input) {
+ private static String logicalSwitchNameFromIid(InstanceIdentifier<?> input) {
InstanceIdentifier<LogicalSwitches> iid = (InstanceIdentifier<LogicalSwitches>)input;
return iid.firstKeyOf(LogicalSwitches.class).getHwvtepNodeName().getValue();
}
private void updateCounters(String nodeId,
- Collection<DataObjectModification<? extends DataObject>> childModCollection) {
+ Collection<? extends DataObjectModification<? extends DataObject>> childModCollection) {
if (childModCollection == null || !updateMetrics) {
return;
}
String childClsName = childMod.getDataType().getClass().getSimpleName();
String modificationType = childMod.getModificationType().toString();
String logicalSwitchName = logicalSwitchNameFromChildMod(childMod);
- childModCounter.label(datastoreType.name())
+ childModCounter.label(Datastore.toType(datastoreType).name())
.label(modificationType)
.label(childClsName)
.label(nodeId)
}
private void processDisconnectedNodes(Collection<DataTreeModification<Node>> changes,
- ReadWriteTransaction tx)
+ TypedReadWriteTransaction<D> tx)
throws InterruptedException, ExecutionException, ReadFailedException {
for (DataTreeModification<Node> change : changes) {
final InstanceIdentifier<Node> key = change.getRootPath().getRootIdentifier();
String nodeId = key.firstKeyOf(Node.class).getNodeId().getValue();
if (deleted != null) {
if (updateMetrics) {
- nodeModCounter.label(datastoreType.name())
+ nodeModCounter.label(Datastore.toType(datastoreType).name())
.label(DataObjectModification.ModificationType.DELETE.name()).label(nodeId).mark();
}
if (!nodeId.contains(HwvtepHAUtil.PHYSICALSWITCH)) {
}
void processConnectedNodes(Collection<DataTreeModification<Node>> changes,
- ReadWriteTransaction tx)
- throws ReadFailedException {
+ TypedReadWriteTransaction<D> tx)
+ throws ExecutionException, InterruptedException {
for (DataTreeModification<Node> change : changes) {
InstanceIdentifier<Node> key = change.getRootPath().getRootIdentifier();
DataObjectModification<Node> mod = change.getRootNode();
String nodeId = key.firstKeyOf(Node.class).getNodeId().getValue();
if (node != null) {
if (updateMetrics) {
- nodeModCounter.label(datastoreType.name())
+ nodeModCounter.label(Datastore.toType(datastoreType).name())
.label(DataObjectModification.ModificationType.WRITE.name()).label(nodeId).mark();
}
if (!nodeId.contains(HwvtepHAUtil.PHYSICALSWITCH)) {
}
}
- ReadWriteTransaction getTx() {
- return new BatchedTransaction();
- }
-
//default methods
- void onGlobalNodeDelete(InstanceIdentifier<Node> key, Node added, ReadWriteTransaction tx)
- throws ReadFailedException {
+ void onGlobalNodeDelete(InstanceIdentifier<Node> key, Node added, TypedReadWriteTransaction<D> tx)
+ throws ReadFailedException, ExecutionException, InterruptedException {
}
- void onPsNodeDelete(InstanceIdentifier<Node> key, Node addedPSNode, ReadWriteTransaction tx)
- throws ReadFailedException {
+ void onPsNodeDelete(InstanceIdentifier<Node> key, Node addedPSNode, TypedReadWriteTransaction<D> tx)
+ throws ReadFailedException, ExecutionException, InterruptedException {
}
- void onGlobalNodeAdd(InstanceIdentifier<Node> key, Node added, ReadWriteTransaction tx) {
+ void onGlobalNodeAdd(InstanceIdentifier<Node> key, Node added, TypedReadWriteTransaction<D> tx) {
}
- void onPsNodeAdd(InstanceIdentifier<Node> key, Node addedPSNode, ReadWriteTransaction tx)
- throws ReadFailedException {
+ void onPsNodeAdd(InstanceIdentifier<Node> key, Node addedPSNode, TypedReadWriteTransaction<D> tx)
+ throws InterruptedException, ExecutionException {
}
void onGlobalNodeUpdate(InstanceIdentifier<Node> key, Node updated, Node original,
- DataObjectModification<Node> mod, ReadWriteTransaction tx)
+ DataObjectModification<Node> mod, TypedReadWriteTransaction<D> tx)
throws ReadFailedException, InterruptedException, ExecutionException {
}
- void onPsNodeUpdate(Node updated, Node original,
- DataObjectModification<Node> mod, ReadWriteTransaction tx)
+ void onPsNodeUpdate(Node updated,
+ DataObjectModification<Node> mod, TypedReadWriteTransaction<D> tx)
throws ReadFailedException, InterruptedException, ExecutionException {
}