import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
}
}
- public void merge(ShardResource shardResource, InstanceIdentifier identifier, DataObject updatedData) {
+ public ListenableFuture<Void> merge(ShardResource shardResource, InstanceIdentifier identifier,
+ DataObject updatedData) {
BlockingQueue<ActionableResource> queue = shardResource.getQueue();
if (queue != null) {
ActionableResource actResource = new ActionableResourceImpl(identifier.toString(),
identifier, ActionableResource.UPDATE, updatedData, null/*oldData*/);
queue.add(actResource);
+ return actResource.getResultFuture();
}
+ return Futures
+ .immediateFailedFuture(new IllegalStateException("Queue missing for provided shardResource "
+ + shardResource.name()));
}
public void merge(String resourceType, InstanceIdentifier identifier, DataObject updatedData) {
}
}
- public void delete(ShardResource shardResource, InstanceIdentifier identifier) {
+ public ListenableFuture<Void> delete(ShardResource shardResource, InstanceIdentifier identifier) {
BlockingQueue<ActionableResource> queue = shardResource.getQueue();
if (queue != null) {
ActionableResource actResource = new ActionableResourceImpl(identifier.toString(),
identifier, ActionableResource.DELETE, null, null/*oldData*/);
queue.add(actResource);
+ return actResource.getResultFuture();
}
+ return Futures
+ .immediateFailedFuture(new IllegalStateException("Queue missing for provided shardResource "
+ + shardResource.name()));
}
public void delete(String resourceType, InstanceIdentifier identifier) {
}
}
- public void put(ShardResource shardResource, InstanceIdentifier identifier, DataObject updatedData) {
+ public ListenableFuture<Void> put(ShardResource shardResource, InstanceIdentifier identifier,
+ DataObject updatedData) {
BlockingQueue<ActionableResource> queue = shardResource.getQueue();
if (queue != null) {
ActionableResource actResource = new ActionableResourceImpl(identifier.toString(),
identifier, ActionableResource.CREATE, updatedData, null/*oldData*/);
queue.add(actResource);
+ return actResource.getResultFuture();
}
+ return Futures
+ .immediateFailedFuture(new IllegalStateException("Queue missing for provided shardResource "
+ + shardResource.name()));
}
public void put(String resourceType, InstanceIdentifier identifier, DataObject updatedData) {
LogicalDatastoreType dsType = resHandler.getDatastoreType();
WriteTransaction tx = broker.newWriteOnlyTransaction();
List<SubTransaction> transactionObjects = new ArrayList<>();
+ Map<SubTransaction, SettableFuture> txMap = new HashMap();
for (ActionableResource actResource : actResourceList) {
+ int startSize = transactionObjects.size();
switch (actResource.getAction()) {
case ActionableResource.CREATE:
identifier = actResource.getInstanceIdentifier();
LOG.error("Unable to determine Action for ResourceType {} with ResourceKey {}",
resourceType, actResource.getKey());
}
+ int endSize = transactionObjects.size();
+ if (endSize > startSize) {
+ txMap.put(transactionObjects.get(endSize - 1), (SettableFuture) actResource.getResultFuture());
+ }
}
long start = System.currentTimeMillis();
try {
futures.get();
+ actResourceList.forEach((actionableResource) ->
+ ((SettableFuture) actionableResource.getResultFuture()).set(null));
long time = System.currentTimeMillis() - start;
LOG.trace("##### Time taken for {} = {}ms", actResourceList.size(), time);
.submit();
try {
futureOperation.get();
+ if (txMap.containsKey(object)) {
+ txMap.get(object).set(null);
+ } else {
+ LOG.error("Subtx object {} has no Actionable-resource associated with it !! ",
+ object.getInstanceIdentifier());
+ }
} catch (InterruptedException | ExecutionException exception) {
+ if (txMap.containsKey(object)) {
+ txMap.get(object).setException(exception);
+ }
LOG.error("Error {} to datastore (path, data) : ({}, {})", object.getAction(),
object.getInstanceIdentifier(), object.getInstance(), exception);
}