2 * Copyright (c) 2015 - 2016 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.genius.utils.batching;
10 import com.google.common.base.Preconditions;
11 import com.google.common.util.concurrent.CheckedFuture;
12 import org.apache.commons.lang3.tuple.ImmutablePair;
13 import org.apache.commons.lang3.tuple.Pair;
14 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
15 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
16 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
17 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
18 import org.opendaylight.yangtools.yang.binding.DataObject;
19 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.concurrent.*;
27 public class ResourceBatchingManager implements AutoCloseable {
28 private static final Logger LOG = LoggerFactory.getLogger(ResourceBatchingManager.class);
29 private static final int INITIAL_DELAY = 3000;
30 private static final TimeUnit TIME_UNIT = TimeUnit.MILLISECONDS;
32 private DataBroker broker;
33 private ConcurrentHashMap<String, Pair<BlockingQueue, ResourceHandler>> resourceHandlerMapper = new ConcurrentHashMap();
34 private ConcurrentHashMap<String, ScheduledThreadPoolExecutor> resourceBatchingThreadMapper = new ConcurrentHashMap();
36 private static ResourceBatchingManager instance;
39 instance = new ResourceBatchingManager();
42 public static ResourceBatchingManager getInstance() {
47 public void close() throws Exception {
48 LOG.trace("ResourceBatchingManager Closed");
51 public void registerBatchableResource(String resourceType, final BlockingQueue<ActionableResource> resQueue, final ResourceHandler resHandler) {
52 Preconditions.checkNotNull(resQueue, "ResourceQueue to use for batching cannot not be null.");
53 Preconditions.checkNotNull(resHandler, "ResourceHandler cannot not be null.");
54 resourceHandlerMapper.put(resourceType, new ImmutablePair<BlockingQueue, ResourceHandler>(resQueue, resHandler));
55 ScheduledThreadPoolExecutor resDelegatorService =(ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1);
56 resourceBatchingThreadMapper.put(resourceType, resDelegatorService);
57 LOG.info("Registered resourceType {} with batchSize {} and batchInterval {}", resourceType,
58 resHandler.getBatchSize(), resHandler.getBatchInterval());
59 if (resDelegatorService.getPoolSize() == 0 )
60 resDelegatorService.scheduleWithFixedDelay(new Batcher(resourceType), INITIAL_DELAY, resHandler.getBatchInterval(), TIME_UNIT);
63 public void deregisterBatchableResource(String resourceType) {
64 resourceHandlerMapper.remove(resourceType);
65 resourceBatchingThreadMapper.remove(resourceType);
68 private class Batcher implements Runnable
70 private String resourceType;
72 Batcher(String resourceType) {
73 this.resourceType = resourceType;
78 List<ActionableResource> resList = new ArrayList<>();
82 Pair<BlockingQueue, ResourceHandler> resMapper = resourceHandlerMapper.get(resourceType);
83 if (resMapper == null) {
84 LOG.error("Unable to find resourceMapper for batching the ResourceType {}", resourceType);
87 BlockingQueue<ActionableResource> resQueue = resMapper.getLeft();
88 ResourceHandler resHandler = resMapper.getRight();
89 resList.add(resQueue.take());
90 resQueue.drainTo(resList);
92 long start = System.currentTimeMillis();
93 int batchSize = resHandler.getBatchSize();
95 int batches = resList.size()/ batchSize;
96 if ( resList.size() > batchSize)
98 LOG.info("Batched up resources of size {} into batches {} for resourcetype {}", resList.size(), batches, resourceType);
99 for (int i = 0, j = 0; i < batches; j = j + batchSize,i++)
101 new MdsalDsTask<>(resourceType, resList.subList(j, j + batchSize)).process();
103 // process remaining routes
104 LOG.trace("Picked up 1 size {} ", resList.subList(batches * batchSize, resList.size()).size());
105 new MdsalDsTask<>(resourceType, resList.subList(batches * batchSize, resList.size())).process();
107 // process less than OR == batchsize routes
108 LOG.trace("Picked up 2 size {}", resList.size());
109 new MdsalDsTask<>(resourceType, resList).process();
112 long timetaken = System.currentTimeMillis() - start;
113 LOG.info( "Total taken ##time = {}ms for resourceList of size {} for resourceType {}", timetaken, resList.size(), resourceType);
115 } catch (InterruptedException e)
123 private class MdsalDsTask<T extends DataObject>
126 List<ActionableResource> actResourceList;
128 public MdsalDsTask(String resourceType, List<ActionableResource> actResourceList)
130 this.resourceType = resourceType;
131 this.actResourceList = actResourceList;
134 public void process() {
135 InstanceIdentifier<T> identifier;
138 LOG.trace("Picked up 3 size {} of resourceType {}", actResourceList.size(), resourceType);
139 Pair<BlockingQueue, ResourceHandler> resMapper = resourceHandlerMapper.get(resourceType);
140 if (resMapper == null) {
141 LOG.error("Unable to find resourceMapper for batching the ResourceType {}", resourceType);
144 ResourceHandler resHandler = resMapper.getRight();
145 DataBroker broker = resHandler.getResourceBroker();
146 LogicalDatastoreType dsType = resHandler.getDatastoreType();
147 WriteTransaction tx = broker.newWriteOnlyTransaction();
148 for (ActionableResource actResource : actResourceList)
150 switch (actResource.getAction()) {
151 case ActionableResource.CREATE:
152 identifier = actResource.getInstanceIdentifier();
153 instance = actResource.getInstance();
154 resHandler.create(tx, dsType, identifier, instance);
156 case ActionableResource.UPDATE:
157 identifier = actResource.getInstanceIdentifier();
158 Object updated = actResource.getInstance();
159 Object original = actResource.getOldInstance();
160 resHandler.update(tx, dsType, identifier, original, updated);
162 case ActionableResource.DELETE:
163 identifier = actResource.getInstanceIdentifier();
164 instance = actResource.getInstance();
165 resHandler.delete(tx, dsType, identifier, instance);
168 LOG.error("Unable to determine Action for ResourceType {} with ResourceKey {}", resourceType,
169 actResource.getKey());
173 long start = System.currentTimeMillis();
174 CheckedFuture<Void, TransactionCommitFailedException> futures = tx.submit();
179 long time = System.currentTimeMillis() - start;
181 LOG.trace( " ##### Time taken for " + actResourceList.size() + " = " + time + "ms");
183 } catch (InterruptedException | ExecutionException e)
185 LOG.error("Exception occurred while writing to datastore: " + e);
188 } catch (final Exception e)
190 LOG.error("Transaction submission failed: ", e);