/*
- * Copyright (c) 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+ * Copyright (c) 2020 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
*/
package org.opendaylight.netvirt.elan.l2gw.ha;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FluentFuture;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
-import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
import org.opendaylight.genius.utils.batching.ResourceBatchingManager;
-import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.binding.util.Datastore;
+import org.opendaylight.mdsal.binding.util.Datastore.Configuration;
+import org.opendaylight.mdsal.binding.util.TypedReadWriteTransaction;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.netvirt.elan.l2gw.ha.listeners.HAJobScheduler;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yangtools.util.concurrent.FluentFutures;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class BatchedTransaction implements ReadWriteTransaction {
+public class BatchedTransaction<D extends Datastore> implements TypedReadWriteTransaction<D> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BatchedTransaction.class);
+ private static Map<InstanceIdentifier, ListenableFuture<Void>> configInProgress = new ConcurrentHashMap<>();
+ private static Map<InstanceIdentifier, ListenableFuture<Void>> opInProgress = new ConcurrentHashMap<>();
+
+ private Map<InstanceIdentifier, ListenableFuture<Void>> currentOps = new ConcurrentHashMap<>();
+
+ private SettableFuture<Void> result = SettableFuture.create();
+ private boolean updateMetric;
+ private NodeId srcNodeId;
+ private Class<D> type;
+
+ public BatchedTransaction(Class<D> logicalDatastoreType) {
+ this.type = logicalDatastoreType;
+ }
+
+ public ListenableFuture<Void> getResult() {
+ return result;
+ }
@Override
- public <T extends DataObject> CheckedFuture<Optional<T>, ReadFailedException> read(
- LogicalDatastoreType logicalDatastoreType, InstanceIdentifier<T> instanceIdentifier) {
- return ResourceBatchingManager.getInstance().read(getShard(logicalDatastoreType).name(), instanceIdentifier);
+ public <T extends DataObject> FluentFuture<Optional<T>> read(InstanceIdentifier<T> instanceIdentifier) {
+ try {
+ return ResourceBatchingManager.getInstance()
+ .read(getShard().name(), instanceIdentifier);
+ } catch (ExecutionException | InterruptedException e) {
+ LOG.error("BatchTxn failed to Read {}", instanceIdentifier);
+ }
+ return FluentFutures.immediateFailedFluentFuture(new Throwable());
}
- ResourceBatchingManager.ShardResource getShard(LogicalDatastoreType logicalDatastoreType) {
- if (logicalDatastoreType == LogicalDatastoreType.CONFIGURATION) {
+ @Override
+ public FluentFuture<Boolean> exists(InstanceIdentifier<?> path) {
+ // NOT SUPPORTED
+ return FluentFutures.immediateFailedFluentFuture(new Throwable());
+ }
+
+ ResourceBatchingManager.ShardResource getShard() {
+ if (Configuration.class.equals(type)) {
return ResourceBatchingManager.ShardResource.CONFIG_TOPOLOGY;
}
return ResourceBatchingManager.ShardResource.OPERATIONAL_TOPOLOGY;
}
- @Override
- public <T extends DataObject> void put(
- LogicalDatastoreType logicalDatastoreType, InstanceIdentifier<T> instanceIdentifier, T dataObj) {
- ResourceBatchingManager.getInstance().put(getShard(logicalDatastoreType), instanceIdentifier, dataObj);
+ public static synchronized <D extends Datastore> void markUpdateInProgress(Class<D> type,
+ InstanceIdentifier instanceIdentifier, ListenableFuture<Void> ft) {
+ markUpdateInProgress(type, instanceIdentifier, ft, "");
+
}
- @Override
- public <T extends DataObject> void put(LogicalDatastoreType logicalDatastoreType,
- InstanceIdentifier<T> instanceIdentifier, T dataObj, boolean flag) {
- ResourceBatchingManager.getInstance().put(getShard(logicalDatastoreType), instanceIdentifier, dataObj);
+ public static synchronized <D extends Datastore> void markUpdateInProgress(Class<D> type,
+ InstanceIdentifier instanceIdentifier,ListenableFuture<Void> ft, String desc) {
+ if (Configuration.class.equals(type)) {
+// NodeKey nodeKey = (NodeKey) instanceIdentifier.firstKeyOf(Node.class);
+ configInProgress.put(instanceIdentifier, ft);
+ Futures.addCallback(ft, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(Void result) {
+ configInProgress.remove(instanceIdentifier);
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ configInProgress.remove(instanceIdentifier);
+ LOG.error("Failed to update mdsal op {}", instanceIdentifier, throwable);
+ }
+ }, MoreExecutors.directExecutor());
+ } else {
+ opInProgress.put(instanceIdentifier, ft);
+ Futures.addCallback(ft, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(Void result) {
+ opInProgress.remove(instanceIdentifier);
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ opInProgress.remove(instanceIdentifier);
+ }
+ }, MoreExecutors.directExecutor());
+ }
}
- @Override
- public <T extends DataObject> void merge(LogicalDatastoreType logicalDatastoreType,
- InstanceIdentifier<T> instanceIdentifier, T dataObj) {
- ResourceBatchingManager.getInstance().merge(getShard(logicalDatastoreType), instanceIdentifier, dataObj);
+ public static synchronized boolean isInProgress(LogicalDatastoreType logicalDatastoreType,
+ InstanceIdentifier instanceIdentifier) {
+
+ ListenableFuture<Void> ft = getInprogressFt(logicalDatastoreType, instanceIdentifier);
+ return ft != null && !ft.isDone() && !ft.isCancelled();
+ }
+
+ public static synchronized boolean addCallbackIfInProgress(LogicalDatastoreType logicalDatastoreType,
+ InstanceIdentifier instanceIdentifier,
+ Runnable runnable) {
+
+ ListenableFuture<Void> ft = getInprogressFt(logicalDatastoreType, instanceIdentifier);
+ if (ft != null && !ft.isDone() && !ft.isCancelled()) {
+ Futures.addCallback(ft, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(Void result) {
+ HAJobScheduler.getInstance().submitJob(runnable);
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ HAJobScheduler.getInstance().submitJob(runnable);
+ }
+ }, MoreExecutors.directExecutor());
+ return true;
+ }
+ if (logicalDatastoreType == LogicalDatastoreType.CONFIGURATION) {
+ configInProgress.remove(instanceIdentifier);
+ } else {
+ opInProgress.remove(instanceIdentifier);
+ }
+ return false;
+ }
+
+ static ListenableFuture<Void> getInprogressFt(LogicalDatastoreType logicalDatastoreType,
+ InstanceIdentifier instanceIdentifier) {
+ if (logicalDatastoreType == LogicalDatastoreType.CONFIGURATION) {
+ return configInProgress.get(instanceIdentifier);
+ } else {
+ return opInProgress.get(instanceIdentifier);
+ }
+ }
+
+ public void waitForCompletion() {
+ if (currentOps.isEmpty()) {
+ return;
+ }
+ Collection<ListenableFuture<Void>> fts = currentOps.values();
+ for (ListenableFuture<Void> ft : fts) {
+ try {
+ ft.get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Failed to get ft result ", e);
+ }
+ }
}
@Override
- public <T extends DataObject> void merge(LogicalDatastoreType logicalDatastoreType,
- InstanceIdentifier<T> instanceIdentifier, T dataObj, boolean flag) {
- ResourceBatchingManager.getInstance().merge(getShard(logicalDatastoreType), instanceIdentifier, dataObj);
+ public <T extends DataObject> void put(InstanceIdentifier<T> instanceIdentifier, T dataObj) {
+ ListenableFuture<Void> ft = ResourceBatchingManager.getInstance().put(getShard(),
+ instanceIdentifier, dataObj);
+ markUpdateInProgress(type, instanceIdentifier, ft);
+ currentOps.put(instanceIdentifier, ft);
}
@Override
- public boolean cancel() {
- return false;
+ public <T extends DataObject> void mergeParentStructurePut(InstanceIdentifier<T> path, T data) {
+
}
@Override
- public void delete(LogicalDatastoreType logicalDatastoreType, InstanceIdentifier<?> instanceIdentifier) {
- ResourceBatchingManager.getInstance().delete(getShard(logicalDatastoreType), instanceIdentifier);
+ public <T extends DataObject> void merge(InstanceIdentifier<T> instanceIdentifier, T dataObj) {
+ ListenableFuture<Void> ft = ResourceBatchingManager.getInstance().merge(getShard(),
+ instanceIdentifier, dataObj);
+ markUpdateInProgress(type, instanceIdentifier, ft);
+ currentOps.put(instanceIdentifier, ft);
}
@Override
- public CheckedFuture<Void, TransactionCommitFailedException> submit() {
- return Futures.immediateCheckedFuture(null);
+ public <T extends DataObject> void mergeParentStructureMerge(InstanceIdentifier<T> path,
+ T data) {
+ //NOT SUPPORTED
+ }
+
+ public ListenableFuture<Void> getFt(InstanceIdentifier instanceIdentifier) {
+ return currentOps.get(instanceIdentifier);
}
@Override
- public @NonNull FluentFuture<? extends @NonNull CommitInfo> commit() {
- return CommitInfo.emptyFluentFuture();
+ public void delete(InstanceIdentifier<?> instanceIdentifier) {
+ ListenableFuture<Void> ft = ResourceBatchingManager.getInstance().delete(getShard(),
+ instanceIdentifier);
+ markUpdateInProgress(type, instanceIdentifier, ft);
+ currentOps.put(instanceIdentifier, ft);
+ }
+
+ public ListenableFuture<Void> submit() {
+ if (currentOps.isEmpty()) {
+ return Futures.immediateFuture(null);
+ }
+ Collection<ListenableFuture<Void>> fts = currentOps.values();
+ AtomicInteger waitCount = new AtomicInteger(fts.size());
+ for (ListenableFuture<Void> ft : fts) {
+ Futures.addCallback(ft, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(Void voidResult) {
+ if (waitCount.decrementAndGet() == 0) {
+ result.set(null);
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ result.setException(throwable);
+ }
+ }, MoreExecutors.directExecutor());
+ }
+ return result;
}
@Override
public Object getIdentifier() {
return "BatchedTransaction";
}
+
+ public void setSrcNodeId(NodeId srcNodeId) {
+ this.srcNodeId = srcNodeId;
+ }
+
+ public NodeId getSrcNodeId() {
+ return srcNodeId;
+ }
+
+ public boolean updateMetric() {
+ return updateMetric;
+ }
+
+ public void updateMetric(Boolean update) {
+ this.updateMetric = update;
+ }
}