MDSAL module sync-up 08/42308/1
authorSuraj Ranjan <suraj.ranjan@ericsson.com>
Fri, 22 Jul 2016 11:54:39 +0000 (17:24 +0530)
committerSuraj Ranjan <suraj.ranjan@ericsson.com>
Fri, 22 Jul 2016 11:55:35 +0000 (17:25 +0530)
Providing a simple batching infrastructure
for modules that use MDSAL DataStores

Change-Id: Ifd8df37ef74b26d98da07d8a5c33c3ae93279e5c
Signed-off-by: Suraj Ranjan <suraj.ranjan@ericsson.com>
mdsalutil/mdsalutil-api/src/main/java/org/opendaylight/genius/utils/batching/ActionableResource.java [new file with mode: 0644]
mdsalutil/mdsalutil-api/src/main/java/org/opendaylight/genius/utils/batching/ActionableResourceImpl.java [new file with mode: 0644]
mdsalutil/mdsalutil-api/src/main/java/org/opendaylight/genius/utils/batching/ResourceBatchingManager.java [new file with mode: 0644]
mdsalutil/mdsalutil-api/src/main/java/org/opendaylight/genius/utils/batching/ResourceHandler.java [new file with mode: 0644]

diff --git a/mdsalutil/mdsalutil-api/src/main/java/org/opendaylight/genius/utils/batching/ActionableResource.java b/mdsalutil/mdsalutil-api/src/main/java/org/opendaylight/genius/utils/batching/ActionableResource.java
new file mode 100644 (file)
index 0000000..e7f3698
--- /dev/null
@@ -0,0 +1,27 @@
+/*
+ * Copyright (c) 2015 - 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.genius.utils.batching;
+
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+public interface ActionableResource {
+    static final short CREATE = 1;
+    static final short UPDATE = 2;
+    static final short DELETE = 3;
+
+    InstanceIdentifier getInstanceIdentifier();
+    void setInstanceIdentifier(InstanceIdentifier identifier);
+    Object getInstance();
+    void setInstance(Object instance);
+    Object getOldInstance();
+    void setOldInstance(Object oldInstance);
+    short getAction();
+    void setAction(short action);
+    String getKey();
+    void setKey(String key);
+}
diff --git a/mdsalutil/mdsalutil-api/src/main/java/org/opendaylight/genius/utils/batching/ActionableResourceImpl.java b/mdsalutil/mdsalutil-api/src/main/java/org/opendaylight/genius/utils/batching/ActionableResourceImpl.java
new file mode 100644 (file)
index 0000000..72cb125
--- /dev/null
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2015 - 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.genius.utils.batching;
+
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+public class ActionableResourceImpl implements ActionableResource {
+    private Object instance;
+    private Object oldInstance;
+    private String key;
+    private InstanceIdentifier identifier;
+    private short action;
+
+    public ActionableResourceImpl(String key) {
+        this.key = key;
+    }
+
+    public void setInstance(Object instance) {
+        this.instance = instance;
+    }
+
+    public Object getInstance() {
+        return this.instance;
+    }
+
+    public void setOldInstance(Object oldInstance) {
+        this.oldInstance = oldInstance;
+    }
+
+    public Object getOldInstance() {
+        return this.oldInstance;
+    }
+
+    public void setInstanceIdentifier(InstanceIdentifier identifier) {
+        this.identifier = identifier;
+    }
+
+    public InstanceIdentifier getInstanceIdentifier() {
+        return this.identifier;
+    }
+
+    public void setAction(short action) {
+        this.action = action;
+    }
+
+    public short getAction(){
+        return action;
+    }
+
+    public void setKey(String key) {
+        this.key = key;
+    }
+
+    public String getKey() {
+        return this.key;
+    }
+}
diff --git a/mdsalutil/mdsalutil-api/src/main/java/org/opendaylight/genius/utils/batching/ResourceBatchingManager.java b/mdsalutil/mdsalutil-api/src/main/java/org/opendaylight/genius/utils/batching/ResourceBatchingManager.java
new file mode 100644 (file)
index 0000000..99792df
--- /dev/null
@@ -0,0 +1,189 @@
+/*
+ * Copyright (c) 2015 - 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.genius.utils.batching;
+
+import com.google.common.util.concurrent.CheckedFuture;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+
+public class ResourceBatchingManager implements AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(ResourceBatchingManager.class);
+    private static final int INITIAL_DELAY = 3000;
+    private static final TimeUnit TIME_UNIT = TimeUnit.MILLISECONDS;
+
+    private DataBroker broker;
+    private ConcurrentHashMap<String, Pair<BlockingQueue, ResourceHandler>> resourceHandlerMapper = new ConcurrentHashMap();
+    private ConcurrentHashMap<String, ScheduledThreadPoolExecutor> resourceBatchingThreadMapper = new ConcurrentHashMap();
+
+    private static ResourceBatchingManager instance;
+
+    static {
+        instance = new ResourceBatchingManager();
+    }
+
+    public static ResourceBatchingManager getInstance() {
+        return instance;
+    }
+
+    @Override
+    public void close() throws Exception {
+        LOG.trace("ResourceBatchingManager Closed");
+    }
+
+    public void registerBatchableResource(String resourceType, final BlockingQueue<ActionableResource> resQueue, final ResourceHandler resHandler) {
+        resourceHandlerMapper.put(resourceType, new ImmutablePair<BlockingQueue, ResourceHandler>(resQueue, resHandler));
+        ScheduledThreadPoolExecutor resDelegatorService =(ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1);
+        resourceBatchingThreadMapper.put(resourceType, resDelegatorService);
+        if (resDelegatorService.getPoolSize() == 0 )
+            resDelegatorService.scheduleAtFixedRate(new Batcher(resourceType), INITIAL_DELAY, resHandler.getBatchInterval(), TIME_UNIT);
+    }
+
+    public void deregisterBatchableResource(String resourceType) {
+        resourceHandlerMapper.remove(resourceType);
+        resourceBatchingThreadMapper.remove(resourceType);
+    }
+
+    private class Batcher implements Runnable
+    {
+        private String resourceType;
+
+        Batcher(String resourceType) {
+            this.resourceType = resourceType;
+        }
+
+        public void run()
+        {
+            List<ActionableResource> resList = new ArrayList<>();
+
+            try
+            {
+                Pair<BlockingQueue, ResourceHandler> resMapper = resourceHandlerMapper.get(resourceType);
+                if (resMapper == null) {
+                    LOG.error("Unable to find resourceMapper for batching the ResourceType {}", resourceType);
+                    return;
+                }
+                BlockingQueue<ActionableResource> resQueue = resMapper.getLeft();
+                ResourceHandler resHandler = resMapper.getRight();
+                resList.add(resQueue.take());
+                resQueue.drainTo(resList);
+
+                long start = System.currentTimeMillis();
+                int batchSize = resHandler.getBatchSize();
+
+                int batches = resList.size()/ batchSize;
+                LOG.trace("Picked up size {} batches {}", resList.size(), batches);
+                if ( resList.size() > batchSize)
+                {
+                    for (int i = 0, j = 0; i < batches; j = j + batchSize,i++)
+                    {
+                        new MdsalDsTask<>(resourceType, resList.subList(j, j + batchSize)).process();
+                    }
+                    // process remaining routes
+                    LOG.trace("Picked up 1 size {} ", resList.subList(batches * batchSize, resList.size()).size());
+                    new MdsalDsTask<>(resourceType, resList.subList(batches * batchSize, resList.size())).process();
+                } else {
+                    // process less than OR == batchsize routes
+                    LOG.info("Picked up 2 size {}", resList.size());
+                    new MdsalDsTask<>(resourceType, resList).process();
+                }
+
+                long timetaken = System.currentTimeMillis() - start;
+                LOG.info( "Total time taken for resourceList of size: " + resList.size() + " ### time =  " + timetaken);
+
+            } catch (InterruptedException e)
+            {
+                e.printStackTrace();
+            }
+
+        }
+    }
+
+    private class MdsalDsTask<T extends DataObject>
+    {
+        String resourceType;
+        List<ActionableResource> actResourceList;
+
+        public MdsalDsTask(String resourceType, List<ActionableResource> actResourceList)
+        {
+            this.resourceType = resourceType;
+            this.actResourceList = actResourceList;
+        }
+
+        public void process() {
+            InstanceIdentifier<T> identifier;
+            Object instance;
+            try {
+                LOG.trace("Picked up 3 size {}", actResourceList.size());
+                Pair<BlockingQueue, ResourceHandler> resMapper = resourceHandlerMapper.get(resourceType);
+                if (resMapper == null) {
+                    LOG.error("Unable to find resourceMapper for batching the ResourceType {}", resourceType);
+                    return;
+                }
+                ResourceHandler resHandler = resMapper.getRight();
+                DataBroker broker = resHandler.getResourceBroker();
+                LogicalDatastoreType dsType = resHandler.getDatastoreType();
+                WriteTransaction tx = broker.newWriteOnlyTransaction();
+                for (ActionableResource actResource : actResourceList)
+                {
+                    switch (actResource.getAction()) {
+                        case ActionableResource.CREATE:
+                            identifier = actResource.getInstanceIdentifier();
+                            instance = actResource.getInstance();
+                            resHandler.create(tx, dsType, identifier, instance);
+                            break;
+                        case ActionableResource.UPDATE:
+                            identifier = actResource.getInstanceIdentifier();
+                            Object updated = actResource.getInstance();
+                            Object original = actResource.getOldInstance();
+                            resHandler.update(tx, dsType, identifier, original, updated);
+                            break;
+                        case ActionableResource.DELETE:
+                            identifier = actResource.getInstanceIdentifier();
+                            instance = actResource.getInstance();
+                            resHandler.delete(tx, dsType, identifier, instance);
+                            break;
+                        default:
+                            LOG.error("Unable to determine Action for ResourceType {} with ResourceKey {}", resourceType,
+                                    actResource.getKey());
+                    }
+                }
+
+                long start = System.currentTimeMillis();
+                CheckedFuture<Void, TransactionCommitFailedException> futures = tx.submit();
+
+                try
+                {
+                    futures.get();
+                    long time = System.currentTimeMillis() - start;
+
+                    LOG.info( " ##### Time taken for " + actResourceList.size() + " = " + time);
+
+                } catch (InterruptedException | ExecutionException e)
+                {
+                    LOG.error("Exception occurred while writing to datastore: " + e);
+                }
+
+            } catch (final Exception e)
+            {
+                LOG.error("Transaction submission failed: ", e);
+            }
+        }
+    }
+}
diff --git a/mdsalutil/mdsalutil-api/src/main/java/org/opendaylight/genius/utils/batching/ResourceHandler.java b/mdsalutil/mdsalutil-api/src/main/java/org/opendaylight/genius/utils/batching/ResourceHandler.java
new file mode 100644 (file)
index 0000000..e352d94
--- /dev/null
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2015 - 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.genius.utils.batching;
+
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+public interface ResourceHandler {
+    void create(WriteTransaction tx, LogicalDatastoreType datastoreType, InstanceIdentifier identifer, Object vrfEntry);
+    void delete(WriteTransaction tx, LogicalDatastoreType datastoreType, InstanceIdentifier identifer, Object vrfEntry);
+    void update(WriteTransaction tx, LogicalDatastoreType datastoreType, InstanceIdentifier identifier, Object original,
+                Object update);
+    LogicalDatastoreType getDatastoreType();
+    int getBatchSize();
+    int getBatchInterval();
+    DataBroker getResourceBroker();
+}