2 * Copyright (c) 2015 - 2016 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.genius.utils.batching;
10 import com.google.common.base.Preconditions;
11 import com.google.common.util.concurrent.CheckedFuture;
12 import java.util.ArrayList;
13 import java.util.List;
14 import java.util.concurrent.BlockingQueue;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ExecutionException;
17 import java.util.concurrent.Executors;
18 import java.util.concurrent.ScheduledThreadPoolExecutor;
19 import java.util.concurrent.TimeUnit;
20 import org.apache.commons.lang3.tuple.ImmutablePair;
21 import org.apache.commons.lang3.tuple.Pair;
22 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
23 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
24 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
25 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
26 import org.opendaylight.yangtools.yang.binding.DataObject;
27 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
31 public class ResourceBatchingManager implements AutoCloseable {
33 private static final Logger LOG = LoggerFactory.getLogger(ResourceBatchingManager.class);
35 private static final int INITIAL_DELAY = 3000;
36 private static final TimeUnit TIME_UNIT = TimeUnit.MILLISECONDS;
38 private final ConcurrentHashMap<String, Pair<BlockingQueue, ResourceHandler>>
39 resourceHandlerMapper = new ConcurrentHashMap<>();
41 private final ConcurrentHashMap<String, ScheduledThreadPoolExecutor>
42 resourceBatchingThreadMapper = new ConcurrentHashMap<>();
44 private static ResourceBatchingManager instance;
47 instance = new ResourceBatchingManager();
50 public static ResourceBatchingManager getInstance() {
55 public void close() throws Exception {
56 LOG.trace("ResourceBatchingManager Closed");
59 public void registerBatchableResource(
60 String resourceType, final BlockingQueue<ActionableResource> resQueue, final ResourceHandler resHandler) {
61 Preconditions.checkNotNull(resQueue, "ResourceQueue to use for batching cannot not be null.");
62 Preconditions.checkNotNull(resHandler, "ResourceHandler cannot not be null.");
63 if (resourceHandlerMapper.contains(resourceType)) {
64 throw new RuntimeException("Resource type already registered");
66 resourceHandlerMapper.put(resourceType, new ImmutablePair<>(resQueue, resHandler));
67 ScheduledThreadPoolExecutor resDelegatorService = (ScheduledThreadPoolExecutor)
68 Executors.newScheduledThreadPool(1);
69 resourceBatchingThreadMapper.put(resourceType, resDelegatorService);
70 LOG.info("Registered resourceType {} with batchSize {} and batchInterval {}", resourceType,
71 resHandler.getBatchSize(), resHandler.getBatchInterval());
72 if (resDelegatorService.getPoolSize() == 0 ) {
73 resDelegatorService.scheduleWithFixedDelay(
74 new Batcher(resourceType), INITIAL_DELAY, resHandler.getBatchInterval(), TIME_UNIT);
78 public void put(String resourceType, InstanceIdentifier identifier, DataObject updatedData) {
79 BlockingQueue<ActionableResource> queue = getQueue(resourceType);
81 ActionableResource actResource = new ActionableResourceImpl(identifier.toString(),
82 identifier, ActionableResource.CREATE, updatedData, null/*oldData*/);
83 queue.add(actResource);
87 public void merge(String resourceType, InstanceIdentifier identifier, DataObject updatedData) {
88 BlockingQueue<ActionableResource> queue = getQueue(resourceType);
90 ActionableResource actResource = new ActionableResourceImpl(identifier.toString(),
91 identifier, ActionableResource.UPDATE, updatedData, null/*oldData*/);
92 queue.add(actResource);
96 public void delete(String resourceType, InstanceIdentifier identifier) {
97 BlockingQueue<ActionableResource> queue = getQueue(resourceType);
99 ActionableResource actResource = new ActionableResourceImpl(identifier.toString(),
100 identifier, ActionableResource.DELETE, null, null/*oldData*/);
101 queue.add(actResource);
105 private BlockingQueue<ActionableResource> getQueue(String resourceType) {
106 if (resourceHandlerMapper.containsKey(resourceType)) {
107 return resourceHandlerMapper.get(resourceType).getLeft();
113 public void deregisterBatchableResource(String resourceType) {
114 resourceHandlerMapper.remove(resourceType);
115 resourceBatchingThreadMapper.remove(resourceType);
118 private class Batcher implements Runnable {
119 private final String resourceType;
121 Batcher(String resourceType) {
122 this.resourceType = resourceType;
127 List<ActionableResource> resList = new ArrayList<>();
130 Pair<BlockingQueue, ResourceHandler> resMapper = resourceHandlerMapper.get(resourceType);
131 if (resMapper == null) {
132 LOG.error("Unable to find resourceMapper for batching the ResourceType {}", resourceType);
135 BlockingQueue<ActionableResource> resQueue = resMapper.getLeft();
136 ResourceHandler resHandler = resMapper.getRight();
137 resList.add(resQueue.take());
138 resQueue.drainTo(resList);
140 long start = System.currentTimeMillis();
141 int batchSize = resHandler.getBatchSize();
143 int batches = resList.size() / batchSize;
144 if (resList.size() > batchSize) {
145 LOG.info("Batched up resources of size {} into batches {} for resourcetype {}",
146 resList.size(), batches, resourceType);
147 for (int i = 0, j = 0; i < batches; j = j + batchSize,i++) {
148 new MdsalDsTask<>(resourceType, resList.subList(j, j + batchSize)).process();
150 // process remaining routes
151 LOG.trace("Picked up 1 size {} ", resList.subList(batches * batchSize, resList.size()).size());
152 new MdsalDsTask<>(resourceType, resList.subList(batches * batchSize, resList.size())).process();
154 // process less than OR == batchsize routes
155 LOG.trace("Picked up 2 size {}", resList.size());
156 new MdsalDsTask<>(resourceType, resList).process();
159 long timetaken = System.currentTimeMillis() - start;
160 LOG.info("Total taken ##time = {}ms for resourceList of size {} for resourceType {}",
161 timetaken, resList.size(), resourceType);
163 } catch (InterruptedException e) {
164 LOG.error("InterruptedException during run()", e);
170 private class MdsalDsTask<T extends DataObject> {
172 List<ActionableResource> actResourceList;
174 MdsalDsTask(String resourceType, List<ActionableResource> actResourceList) {
175 this.resourceType = resourceType;
176 this.actResourceList = actResourceList;
179 @SuppressWarnings("checkstyle:IllegalCatch")
180 public void process() {
181 InstanceIdentifier<T> identifier;
184 LOG.trace("Picked up 3 size {} of resourceType {}", actResourceList.size(), resourceType);
185 Pair<BlockingQueue, ResourceHandler> resMapper = resourceHandlerMapper.get(resourceType);
186 if (resMapper == null) {
187 LOG.error("Unable to find resourceMapper for batching the ResourceType {}", resourceType);
190 ResourceHandler resHandler = resMapper.getRight();
191 DataBroker broker = resHandler.getResourceBroker();
192 LogicalDatastoreType dsType = resHandler.getDatastoreType();
193 WriteTransaction tx = broker.newWriteOnlyTransaction();
194 List<SubTransaction> transactionObjects = new ArrayList<>();
195 for (ActionableResource actResource : actResourceList) {
196 switch (actResource.getAction()) {
197 case ActionableResource.CREATE:
198 identifier = actResource.getInstanceIdentifier();
199 instance = actResource.getInstance();
200 resHandler.create(tx, dsType, identifier, instance,transactionObjects);
202 case ActionableResource.UPDATE:
203 identifier = actResource.getInstanceIdentifier();
204 Object updated = actResource.getInstance();
205 Object original = actResource.getOldInstance();
206 resHandler.update(tx, dsType, identifier, original, updated,transactionObjects);
208 case ActionableResource.DELETE:
209 identifier = actResource.getInstanceIdentifier();
210 instance = actResource.getInstance();
211 resHandler.delete(tx, dsType, identifier, instance,transactionObjects);
214 LOG.error("Unable to determine Action for ResourceType {} with ResourceKey {}",
215 resourceType, actResource.getKey());
219 long start = System.currentTimeMillis();
220 CheckedFuture<Void, TransactionCommitFailedException> futures = tx.submit();
224 long time = System.currentTimeMillis() - start;
225 LOG.trace("##### Time taken for {} = {}ms", actResourceList.size(), time);
227 } catch (InterruptedException | ExecutionException e) {
228 LOG.error("Exception occurred while batch writing to datastore", e);
229 LOG.info("Trying to submit transaction operations one at a time for resType {}", resourceType);
230 for (SubTransaction object : transactionObjects) {
231 WriteTransaction writeTransaction = broker.newWriteOnlyTransaction();
232 switch (object.getAction()) {
233 case SubTransaction.CREATE :
234 writeTransaction.put(dsType, object.getInstanceIdentifier(),
235 (DataObject) object.getInstance(), true);
237 case SubTransaction.DELETE :
238 writeTransaction.delete(dsType, object.getInstanceIdentifier());
240 case SubTransaction.UPDATE :
241 writeTransaction.merge(dsType, object.getInstanceIdentifier(),
242 (DataObject) object.getInstance(), true);
245 LOG.error("Unable to determine Action for transaction object with id {}",
246 object.getInstanceIdentifier());
249 writeTransaction.submit().get();
250 } catch (InterruptedException | ExecutionException exception) {
251 LOG.error("Error {} to datastore (path, data) : ({}, {})", object.getAction(),
252 object.getInstanceIdentifier(), object.getInstance(), exception);
257 } catch (final Exception e) {
258 LOG.error("Transaction submission failed", e);