Bug 8938 - Resource-batch manager enhancement 60/61260/10
authoreaksahu <a.k.sahu@ericsson.com>
Mon, 7 Aug 2017 10:09:39 +0000 (15:39 +0530)
committerFaseela K <faseela.k@ericsson.com>
Tue, 22 Aug 2017 11:28:39 +0000 (11:28 +0000)
Adding capabilities to return Futures in Resource batch manager calls.
This fetures helps the caller to know whether the submitted tx job has
been successful or not.
And upon success or failure , the caller can take more corrective
action.

Change-Id: Iec5a08721979cfa15a34617fb1ccea0065853c84
Signed-off-by: eaksahu <a.k.sahu@ericsson.com>
mdsalutil/mdsalutil-api/src/main/java/org/opendaylight/genius/utils/batching/ActionableResource.java
mdsalutil/mdsalutil-api/src/main/java/org/opendaylight/genius/utils/batching/ActionableResourceImpl.java
mdsalutil/mdsalutil-api/src/main/java/org/opendaylight/genius/utils/batching/ResourceBatchingManager.java

index 1807401d429fa9a7654e3389fb77a933f44712fb..7bbbc5ec02800f9f11dbe5e211cee174ce355cdc 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.genius.utils.batching;
 
+import com.google.common.util.concurrent.ListenableFuture;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 
 public interface ActionableResource {
@@ -34,4 +35,6 @@ public interface ActionableResource {
     String getKey();
 
     void setKey(String key);
+
+    ListenableFuture<Void> getResultFuture();
 }
index 44ff6fde6eb8328a1ff96a4b96edf43e43b247cb..2e54d5d46e1acf70050fb980728fc47b3ebe8eff 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.genius.utils.batching;
 
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 
 public class ActionableResourceImpl implements ActionableResource {
@@ -15,6 +17,7 @@ public class ActionableResourceImpl implements ActionableResource {
     private String key;
     private InstanceIdentifier identifier;
     private short action;
+    private final SettableFuture future = SettableFuture.create();
 
     public ActionableResourceImpl(String key) {
         this.key = key;
@@ -78,4 +81,9 @@ public class ActionableResourceImpl implements ActionableResource {
     public String getKey() {
         return this.key;
     }
+
+    @Override
+    public ListenableFuture<Void> getResultFuture() {
+        return future;
+    }
 }
index 980c217e3224c45d3278eb36a7cc153393348b65..b5c944604ce1c5c1468c7d2943894dcddbdfb723 100644 (file)
@@ -9,8 +9,13 @@ package org.opendaylight.genius.utils.batching;
 
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
@@ -18,6 +23,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
@@ -127,13 +133,18 @@ public class ResourceBatchingManager implements AutoCloseable {
         }
     }
 
-    public void merge(ShardResource shardResource, InstanceIdentifier identifier, DataObject updatedData) {
+    public ListenableFuture<Void> merge(ShardResource shardResource, InstanceIdentifier identifier,
+                                        DataObject updatedData) {
         BlockingQueue<ActionableResource> queue = shardResource.getQueue();
         if (queue != null) {
             ActionableResource actResource = new ActionableResourceImpl(identifier.toString(),
                     identifier, ActionableResource.UPDATE, updatedData, null/*oldData*/);
             queue.add(actResource);
+            return actResource.getResultFuture();
         }
+        return Futures
+                .immediateFailedFuture(new IllegalStateException("Queue missing for provided shardResource "
+                        + shardResource.name()));
     }
 
     public void merge(String resourceType, InstanceIdentifier identifier, DataObject updatedData) {
@@ -145,13 +156,17 @@ public class ResourceBatchingManager implements AutoCloseable {
         }
     }
 
-    public void delete(ShardResource shardResource, InstanceIdentifier identifier) {
+    public ListenableFuture<Void> delete(ShardResource shardResource, InstanceIdentifier identifier) {
         BlockingQueue<ActionableResource> queue = shardResource.getQueue();
         if (queue != null) {
             ActionableResource actResource = new ActionableResourceImpl(identifier.toString(),
                     identifier, ActionableResource.DELETE, null, null/*oldData*/);
             queue.add(actResource);
+            return actResource.getResultFuture();
         }
+        return Futures
+                .immediateFailedFuture(new IllegalStateException("Queue missing for provided shardResource "
+                        + shardResource.name()));
     }
 
     public void delete(String resourceType, InstanceIdentifier identifier) {
@@ -163,13 +178,18 @@ public class ResourceBatchingManager implements AutoCloseable {
         }
     }
 
-    public void put(ShardResource shardResource, InstanceIdentifier identifier, DataObject updatedData) {
+    public ListenableFuture<Void> put(ShardResource shardResource, InstanceIdentifier identifier,
+                                      DataObject updatedData) {
         BlockingQueue<ActionableResource> queue = shardResource.getQueue();
         if (queue != null) {
             ActionableResource actResource = new ActionableResourceImpl(identifier.toString(),
                     identifier, ActionableResource.CREATE, updatedData, null/*oldData*/);
             queue.add(actResource);
+            return actResource.getResultFuture();
         }
+        return Futures
+                .immediateFailedFuture(new IllegalStateException("Queue missing for provided shardResource "
+                        + shardResource.name()));
     }
 
     public void put(String resourceType, InstanceIdentifier identifier, DataObject updatedData) {
@@ -271,7 +291,9 @@ public class ResourceBatchingManager implements AutoCloseable {
             LogicalDatastoreType dsType = resHandler.getDatastoreType();
             WriteTransaction tx = broker.newWriteOnlyTransaction();
             List<SubTransaction> transactionObjects = new ArrayList<>();
+            Map<SubTransaction, SettableFuture> txMap = new HashMap();
             for (ActionableResource actResource : actResourceList) {
+                int startSize = transactionObjects.size();
                 switch (actResource.getAction()) {
                     case ActionableResource.CREATE:
                         identifier = actResource.getInstanceIdentifier();
@@ -293,6 +315,10 @@ public class ResourceBatchingManager implements AutoCloseable {
                         LOG.error("Unable to determine Action for ResourceType {} with ResourceKey {}",
                                 resourceType, actResource.getKey());
                 }
+                int endSize = transactionObjects.size();
+                if (endSize > startSize) {
+                    txMap.put(transactionObjects.get(endSize - 1), (SettableFuture) actResource.getResultFuture());
+                }
             }
 
             long start = System.currentTimeMillis();
@@ -300,6 +326,8 @@ public class ResourceBatchingManager implements AutoCloseable {
 
             try {
                 futures.get();
+                actResourceList.forEach((actionableResource) ->
+                        ((SettableFuture) actionableResource.getResultFuture()).set(null));
                 long time = System.currentTimeMillis() - start;
                 LOG.trace("##### Time taken for {} = {}ms", actResourceList.size(), time);
 
@@ -328,7 +356,16 @@ public class ResourceBatchingManager implements AutoCloseable {
                             .submit();
                     try {
                         futureOperation.get();
+                        if (txMap.containsKey(object)) {
+                            txMap.get(object).set(null);
+                        } else {
+                            LOG.error("Subtx object {} has no Actionable-resource associated with it !! ",
+                                    object.getInstanceIdentifier());
+                        }
                     } catch (InterruptedException | ExecutionException exception) {
+                        if (txMap.containsKey(object)) {
+                            txMap.get(object).setException(exception);
+                        }
                         LOG.error("Error {} to datastore (path, data) : ({}, {})", object.getAction(),
                                 object.getInstanceIdentifier(), object.getInstance(), exception);
                     }