*/
package org.opendaylight.netvirt.elan.l2gw.ha.handlers;
-import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
-import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
+import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
-import com.google.common.base.Optional;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
import javax.inject.Inject;
import javax.inject.Singleton;
-
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.genius.utils.hwvtep.HwvtepHACache;
-import org.opendaylight.netvirt.elan.l2gw.ha.BatchedTransaction;
+import org.opendaylight.genius.infra.Datastore;
+import org.opendaylight.genius.infra.Datastore.Configuration;
+import org.opendaylight.genius.infra.Datastore.Operational;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
+import org.opendaylight.genius.infra.TypedReadWriteTransaction;
+import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
+import org.opendaylight.mdsal.binding.api.DataBroker;
import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
import org.opendaylight.netvirt.elan.l2gw.ha.listeners.HAJobScheduler;
import org.opendaylight.netvirt.elan.l2gw.ha.merge.GlobalAugmentationMerger;
import org.slf4j.LoggerFactory;
@Singleton
-public class NodeCopier implements INodeCopier {
+public class NodeCopier {
- static Logger LOG = LoggerFactory.getLogger(NodeCopier.class);
+ private static final Logger LOG = LoggerFactory.getLogger(NodeCopier.class);
- GlobalAugmentationMerger globalAugmentationMerger = GlobalAugmentationMerger.getInstance();
- PSAugmentationMerger psAugmentationMerger = PSAugmentationMerger.getInstance();
- GlobalNodeMerger globalNodeMerger = GlobalNodeMerger.getInstance();
- PSNodeMerger psNodeMerger = PSNodeMerger.getInstance();
- DataBroker db;
- HwvtepHACache hwvtepHACache = HwvtepHACache.getInstance();
+ private final GlobalAugmentationMerger globalAugmentationMerger = GlobalAugmentationMerger.getInstance();
+ private final PSAugmentationMerger psAugmentationMerger = PSAugmentationMerger.getInstance();
+ private final GlobalNodeMerger globalNodeMerger = GlobalNodeMerger.getInstance();
+ private final PSNodeMerger psNodeMerger = PSNodeMerger.getInstance();
+ private final ManagedNewTransactionRunner txRunner;
@Inject
public NodeCopier(DataBroker db) {
- this.db = db;
+ this.txRunner = new ManagedNewTransactionRunnerImpl(db);
}
- public void copyGlobalNode(Optional<Node> srcGlobalNodeOptional,
- InstanceIdentifier<Node> srcPath,
- InstanceIdentifier<Node> dstPath,
- LogicalDatastoreType logicalDatastoreType,
- ReadWriteTransaction tx) throws ReadFailedException {
- if (!srcGlobalNodeOptional.isPresent() && logicalDatastoreType == CONFIGURATION) {
- Futures.addCallback(tx.read(logicalDatastoreType, srcPath), new FutureCallback<Optional<Node>>() {
+ public <D extends Datastore> void copyGlobalNode(Optional<Node> srcGlobalNodeOptional,
+ InstanceIdentifier<Node> srcPath,
+ InstanceIdentifier<Node> dstPath,
+ Class<D> datastoreType,
+ TypedReadWriteTransaction<D> tx)
+ throws ExecutionException, InterruptedException {
+ if (!srcGlobalNodeOptional.isPresent() && Configuration.class.equals(datastoreType)) {
+ Futures.addCallback(tx.read(srcPath), new FutureCallback<Optional<Node>>() {
@Override
public void onSuccess(Optional<Node> nodeOptional) {
- HAJobScheduler.getInstance().submitJob(() -> {
- try {
- ReadWriteTransaction tx1 = new BatchedTransaction();
+ HAJobScheduler.getInstance().submitJob(() -> LoggingFutures.addErrorLogging(
+ txRunner.callWithNewReadWriteTransactionAndSubmit(datastoreType, tx -> {
if (nodeOptional.isPresent()) {
- copyGlobalNode(nodeOptional, srcPath, dstPath, logicalDatastoreType, tx1);
+ copyGlobalNode(nodeOptional, srcPath, dstPath, datastoreType, tx);
} else {
- /**
+ /*
* In case the Parent HA Global Node is not present and Child HA node is present
- * It means that both the child are disconnected/removed hence the parent is deleted.
+ * It means that both the child are disconnected/removed hence the parent is
+ * deleted.
* @see org.opendaylight.netvirt.elan.l2gw.ha.listeners.HAOpNodeListener
* OnGLobalNode() delete function
* So we should delete the existing config child node as cleanup
*/
- HwvtepHAUtil.deleteNodeIfPresent(tx1, logicalDatastoreType, dstPath);
+ HwvtepHAUtil.deleteNodeIfPresent(tx, dstPath);
}
- } catch (ReadFailedException e) {
- LOG.error("Failed to read source node {}",srcPath);
- }
- });
+ }), LOG, "Failed to read source node {}", srcPath));
}
@Override
public void onFailure(Throwable throwable) {
}
- });
+ }, MoreExecutors.directExecutor());
return;
}
HwvtepGlobalAugmentation srcGlobalAugmentation =
- srcGlobalNodeOptional.get().getAugmentation(HwvtepGlobalAugmentation.class);
+ srcGlobalNodeOptional.get().augmentation(HwvtepGlobalAugmentation.class);
if (srcGlobalAugmentation == null) {
- /**
+ /*
* If Source HA Global Node is not present
* It means that both the child are disconnected/removed hence the parent is deleted.
* @see org.opendaylight.netvirt.elan.l2gw.ha.listeners.HAOpNodeListener OnGLobalNode() delete function
* So we should delete the existing config child node as cleanup
*/
- HwvtepHAUtil.deleteNodeIfPresent(tx, logicalDatastoreType, dstPath);
+ HwvtepHAUtil.deleteNodeIfPresent(tx, dstPath);
return;
}
NodeBuilder haNodeBuilder = HwvtepHAUtil.getNodeBuilderForPath(dstPath);
HwvtepGlobalAugmentationBuilder haBuilder = new HwvtepGlobalAugmentationBuilder();
- Optional<Node> existingDstGlobalNodeOptional = tx.read(logicalDatastoreType, dstPath).checkedGet();
+ Optional<Node> existingDstGlobalNodeOptional = tx.read(dstPath).get();
Node existingDstGlobalNode =
existingDstGlobalNodeOptional.isPresent() ? existingDstGlobalNodeOptional.get() : null;
HwvtepGlobalAugmentation existingHAGlobalData = HwvtepHAUtil.getGlobalAugmentationOfNode(existingDstGlobalNode);
- globalAugmentationMerger.mergeOperationalData(haBuilder, existingHAGlobalData, srcGlobalAugmentation, dstPath);
- globalNodeMerger.mergeOperationalData(haNodeBuilder,
- existingDstGlobalNode, srcGlobalNodeOptional.get(), dstPath);
-
- if (OPERATIONAL == logicalDatastoreType) {
+ if (Operational.class.equals(datastoreType)) {
+ globalAugmentationMerger.mergeOperationalData(haBuilder, existingHAGlobalData, srcGlobalAugmentation,
+ dstPath);
+ globalNodeMerger.mergeOperationalData(haNodeBuilder,
+ existingDstGlobalNode, srcGlobalNodeOptional.get(), dstPath);
haBuilder.setManagers(HwvtepHAUtil.buildManagersForHANode(srcGlobalNodeOptional.get(),
existingDstGlobalNodeOptional));
//Also update the manager section in config which helps in cluster reboot scenarios
- haBuilder.getManagers().stream().forEach((manager) -> {
- InstanceIdentifier<Managers> managerIid = dstPath.augmentation(HwvtepGlobalAugmentation.class)
- .child(Managers.class, manager.getKey());
- tx.put(CONFIGURATION, managerIid, manager, true);
- });
+ LoggingFutures.addErrorLogging(
+ txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION,
+ confTx -> haBuilder.getManagers().values().forEach(manager -> {
+ InstanceIdentifier<Managers> managerIid =
+ dstPath.augmentation(HwvtepGlobalAugmentation.class).child(Managers.class, manager.key());
+ confTx.mergeParentStructurePut(managerIid, manager);
+ })), LOG, "Error updating the manager section in config");
+ } else {
+ globalAugmentationMerger.mergeConfigData(haBuilder, srcGlobalAugmentation, dstPath);
+ globalNodeMerger.mergeConfigData(haNodeBuilder, srcGlobalNodeOptional.get(), dstPath);
}
+
haBuilder.setDbVersion(srcGlobalAugmentation.getDbVersion());
haNodeBuilder.addAugmentation(HwvtepGlobalAugmentation.class, haBuilder.build());
Node haNode = haNodeBuilder.build();
- if (OPERATIONAL == logicalDatastoreType) {
- tx.merge(logicalDatastoreType, dstPath, haNode, true);
+ if (Operational.class.equals(datastoreType)) {
+ tx.mergeParentStructureMerge(dstPath, haNode);
} else {
- tx.put(logicalDatastoreType, dstPath, haNode, true);
+ tx.mergeParentStructurePut(dstPath, haNode);
}
}
- public void copyPSNode(Optional<Node> srcPsNodeOptional,
+ public <D extends Datastore> void copyPSNode(Optional<Node> srcPsNodeOptional,
InstanceIdentifier<Node> srcPsPath,
InstanceIdentifier<Node> dstPsPath,
InstanceIdentifier<Node> dstGlobalPath,
- LogicalDatastoreType logicalDatastoreType,
- ReadWriteTransaction tx) throws ReadFailedException {
- if (!srcPsNodeOptional.isPresent() && logicalDatastoreType == CONFIGURATION) {
- Futures.addCallback(tx.read(logicalDatastoreType, srcPsPath), new FutureCallback<Optional<Node>>() {
+ Class<D> datastoreType,
+ TypedReadWriteTransaction<D> tx)
+ throws ExecutionException, InterruptedException {
+ if (!srcPsNodeOptional.isPresent() && Configuration.class.equals(datastoreType)) {
+ Futures.addCallback(tx.read(srcPsPath), new FutureCallback<Optional<Node>>() {
@Override
public void onSuccess(Optional<Node> nodeOptional) {
HAJobScheduler.getInstance().submitJob(() -> {
- try {
- ReadWriteTransaction tx1 = new BatchedTransaction();
- if (nodeOptional.isPresent()) {
- copyPSNode(nodeOptional,
- srcPsPath, dstPsPath, dstGlobalPath, logicalDatastoreType, tx1);
- } else {
- /**
- * Deleting node please refer @see #copyGlobalNode for explanation
- */
- HwvtepHAUtil.deleteNodeIfPresent(tx1, logicalDatastoreType, dstPsPath);
- }
- } catch (ReadFailedException e) {
- LOG.error("Failed to read src node {}", srcPsNodeOptional.get());
- }
+ LoggingFutures.addErrorLogging(
+ txRunner.callWithNewReadWriteTransactionAndSubmit(datastoreType, tx -> {
+ if (nodeOptional.isPresent()) {
+ copyPSNode(nodeOptional,
+ srcPsPath, dstPsPath, dstGlobalPath, datastoreType, tx);
+ } else {
+ /*
+ * Deleting node please refer @see #copyGlobalNode for explanation
+ */
+ HwvtepHAUtil.deleteNodeIfPresent(tx, dstPsPath);
+ }
+ }), LOG, "Failed to read source node {}", srcPsPath);
});
}
@Override
public void onFailure(Throwable throwable) {
}
- });
+ }, MoreExecutors.directExecutor());
return;
}
NodeBuilder dstPsNodeBuilder = HwvtepHAUtil.getNodeBuilderForPath(dstPsPath);
PhysicalSwitchAugmentationBuilder dstPsAugmentationBuilder = new PhysicalSwitchAugmentationBuilder();
PhysicalSwitchAugmentation srcPsAugmenatation =
- srcPsNodeOptional.get().getAugmentation(PhysicalSwitchAugmentation.class);
+ srcPsNodeOptional.get().augmentation(PhysicalSwitchAugmentation.class);
- Node existingDstPsNode = HwvtepHAUtil.readNode(tx, logicalDatastoreType, dstPsPath);
+ Node existingDstPsNode = tx.read(dstPsPath).get().orElse(null);
PhysicalSwitchAugmentation existingDstPsAugmentation =
HwvtepHAUtil.getPhysicalSwitchAugmentationOfNode(existingDstPsNode);
- if (OPERATIONAL == logicalDatastoreType) {
+ if (Operational.class.equals(datastoreType)) {
psAugmentationMerger.mergeOperationalData(dstPsAugmentationBuilder, existingDstPsAugmentation,
srcPsAugmenatation, dstPsPath);
psNodeMerger.mergeOperationalData(dstPsNodeBuilder, existingDstPsNode, srcPsNodeOptional.get(), dstPsPath);
dstPsNodeBuilder.addAugmentation(PhysicalSwitchAugmentation.class, dstPsAugmentationBuilder.build());
Node dstPsNode = dstPsNodeBuilder.build();
- tx.merge(logicalDatastoreType, dstPsPath, dstPsNode, true);
- LOG.debug("Copied {} physical switch node from {} to {}", logicalDatastoreType, srcPsPath, dstPsPath);
+ tx.mergeParentStructureMerge(dstPsPath, dstPsNode);
+ LOG.debug("Copied {} physical switch node from {} to {}", datastoreType, srcPsPath, dstPsPath);
}
public void mergeOpManagedByAttributes(PhysicalSwitchAugmentation psAugmentation,