2 * Copyright (c) 2015 - 2018 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.genius.utils.batching;
10 import com.google.common.base.Preconditions;
11 import com.google.common.util.concurrent.FluentFuture;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import com.google.common.util.concurrent.SettableFuture;
17 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
18 import java.util.ArrayList;
19 import java.util.HashMap;
20 import java.util.List;
22 import java.util.Optional;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.ScheduledExecutorService;
29 import java.util.concurrent.TimeUnit;
30 import org.apache.commons.lang3.tuple.ImmutablePair;
31 import org.apache.commons.lang3.tuple.Pair;
32 import org.eclipse.jdt.annotation.NonNull;
33 import org.opendaylight.infrautils.utils.concurrent.Executors;
34 import org.opendaylight.mdsal.binding.api.DataBroker;
35 import org.opendaylight.mdsal.binding.api.ReadTransaction;
36 import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
37 import org.opendaylight.mdsal.binding.api.WriteTransaction;
38 import org.opendaylight.mdsal.common.api.CommitInfo;
39 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
40 import org.opendaylight.mdsal.common.api.ReadFailedException;
41 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
42 import org.opendaylight.yangtools.yang.binding.DataObject;
43 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
48 * This class lets other modules submit their CRUD methods to it. This class
49 * will then supply a single transaction to such CRUD methods of the
50 * subscribers, on which such subscribers write data to that transaction.
51 * Finally the framework attempts to reliably write this single transaction
52 * which represents a batch of an ordered list of entities owned by that subscriber,
53 * to be written/updated/removed from a specific datastore as registered by the subscriber.
55 public class ResourceBatchingManager implements AutoCloseable {
57 private static final Logger LOG = LoggerFactory.getLogger(ResourceBatchingManager.class);
59 private static final int INITIAL_DELAY = 3000;
60 private static final TimeUnit TIME_UNIT = TimeUnit.MILLISECONDS;
62 private static final int PERIODICITY_IN_MS = 500;
63 private static final int BATCH_SIZE = 1000;
65 public enum ShardResource {
66 CONFIG_TOPOLOGY(LogicalDatastoreType.CONFIGURATION),
67 OPERATIONAL_TOPOLOGY(LogicalDatastoreType.OPERATIONAL),
68 CONFIG_INVENTORY(LogicalDatastoreType.CONFIGURATION),
69 OPERATIONAL_INVENTORY(LogicalDatastoreType.OPERATIONAL);
71 BlockingQueue<ActionableResource> queue = new LinkedBlockingQueue<>();
72 LogicalDatastoreType datastoreType;
74 ShardResource(LogicalDatastoreType datastoreType) {
75 this.datastoreType = datastoreType;
78 public LogicalDatastoreType getDatastoreType() {
82 BlockingQueue<ActionableResource> getQueue() {
87 private final ConcurrentHashMap<String, Pair<BlockingQueue<ActionableResource>, ResourceHandler>>
88 resourceHandlerMapper = new ConcurrentHashMap<>();
90 private final ConcurrentHashMap<String, ScheduledExecutorService>
91 resourceBatchingThreadMapper = new ConcurrentHashMap<>();
93 private final Map<String, Set<InstanceIdentifier<?>>> pendingModificationByResourceType = new ConcurrentHashMap<>();
95 private static ResourceBatchingManager instance;
98 instance = new ResourceBatchingManager();
101 public static ResourceBatchingManager getInstance() {
106 public void close() {
107 LOG.trace("ResourceBatchingManager Closed, closing all batched resources");
108 resourceBatchingThreadMapper.values().forEach(ScheduledExecutorService::shutdown);
111 public void registerBatchableResource(
112 String resourceType, final BlockingQueue<ActionableResource> resQueue, final ResourceHandler resHandler) {
113 Preconditions.checkNotNull(resQueue, "ResourceQueue to use for batching cannot not be null.");
114 Preconditions.checkNotNull(resHandler, "ResourceHandler cannot not be null.");
116 resourceHandlerMapper.put(resourceType, new ImmutablePair<>(resQueue, resHandler));
117 ScheduledExecutorService resDelegatorService =
118 Executors.newListeningScheduledThreadPool(1, "ResourceBatchingManager", LOG);
119 resourceBatchingThreadMapper.put(resourceType, resDelegatorService);
120 LOG.info("Registered resourceType {} with batchSize {} and batchInterval {}", resourceType,
121 resHandler.getBatchSize(), resHandler.getBatchInterval());
122 resDelegatorService.scheduleWithFixedDelay(
123 new Batcher(resourceType), resHandler.getBatchInterval(), resHandler.getBatchInterval(), TIME_UNIT);
124 pendingModificationByResourceType.putIfAbsent(resourceType, ConcurrentHashMap.newKeySet());
127 public void registerDefaultBatchHandlers(DataBroker broker) {
128 LOG.trace("Registering default batch handlers");
129 Integer batchSize = Integer.getInteger("resource.manager.batch.size", BATCH_SIZE);
130 Integer batchInterval = Integer.getInteger("resource.manager.batch.periodicity.ms", PERIODICITY_IN_MS);
132 for (ShardResource shardResource : ShardResource.values()) {
133 if (resourceHandlerMapper.containsKey(shardResource.name())) {
136 DefaultBatchHandler batchHandler = new DefaultBatchHandler(broker, shardResource.datastoreType, batchSize,
138 registerBatchableResource(shardResource.name(), shardResource.getQueue(), batchHandler);
142 private void beforeModification(String resoureType, InstanceIdentifier<?> iid) {
143 pendingModificationByResourceType.get(resoureType).add(iid);
146 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
147 justification = "https://github.com/spotbugs/spotbugs/issues/811")
148 private void afterModification(String resoureType, InstanceIdentifier<?> iid) {
149 pendingModificationByResourceType.get(resoureType).remove(iid);
153 * Reads the identifier of the given resource type.
154 * Not to be used by the applications which uses their own resource queue
156 * @param resourceType resource type that was registered with batch manager
157 * @param identifier identifier to be read
158 * @param <T> DataObject subclass
159 * @return a CheckFuture containing the result of the read
161 public <T extends DataObject> FluentFuture<Optional<T>> read(
162 String resourceType, InstanceIdentifier<T> identifier) {
163 BlockingQueue<ActionableResource> queue = getQueue(resourceType);
165 if (pendingModificationByResourceType.get(resourceType).contains(identifier)) {
166 SettableFuture<Optional<T>> readFuture = SettableFuture.create();
167 queue.add(new ActionableReadResource<>(identifier, readFuture));
168 return FluentFuture.from(Futures.makeChecked(readFuture, ReadFailedException.MAPPER));
170 ResourceHandler resourceHandler = resourceHandlerMapper.get(resourceType).getRight();
171 try (ReadTransaction tx = resourceHandler.getResourceBroker().newReadOnlyTransaction()) {
172 return tx.read(resourceHandler.getDatastoreType(), identifier);
177 return FluentFutures.immediateFailedFluentFuture(new ReadFailedException(
178 "No batch handler was registered for resource " + resourceType));
181 public ListenableFuture<Void> merge(ShardResource shardResource, InstanceIdentifier<?> identifier,
182 DataObject updatedData) {
183 BlockingQueue<ActionableResource> queue = shardResource.getQueue();
185 beforeModification(shardResource.name(), identifier);
186 ActionableResource actResource = new ActionableResourceImpl(
187 identifier, ActionableResource.UPDATE, updatedData, null/*oldData*/);
188 queue.add(actResource);
189 return actResource.getResultFuture();
192 .immediateFailedFuture(new IllegalStateException("Queue missing for provided shardResource "
193 + shardResource.name()));
196 public void merge(String resourceType, InstanceIdentifier<?> identifier, DataObject updatedData) {
197 BlockingQueue<ActionableResource> queue = getQueue(resourceType);
199 beforeModification(resourceType, identifier);
200 ActionableResource actResource = new ActionableResourceImpl(
201 identifier, ActionableResource.UPDATE, updatedData, null/*oldData*/);
202 queue.add(actResource);
206 public ListenableFuture<Void> delete(ShardResource shardResource, InstanceIdentifier<?> identifier) {
207 BlockingQueue<ActionableResource> queue = shardResource.getQueue();
209 beforeModification(shardResource.name(), identifier);
210 ActionableResource actResource = new ActionableResourceImpl(
211 identifier, ActionableResource.DELETE, null, null/*oldData*/);
212 queue.add(actResource);
213 return actResource.getResultFuture();
216 .immediateFailedFuture(new IllegalStateException("Queue missing for provided shardResource "
217 + shardResource.name()));
220 public void delete(String resourceType, InstanceIdentifier<?> identifier) {
221 BlockingQueue<ActionableResource> queue = getQueue(resourceType);
223 beforeModification(resourceType, identifier);
224 ActionableResource actResource = new ActionableResourceImpl(
225 identifier, ActionableResource.DELETE, null, null/*oldData*/);
226 queue.add(actResource);
230 public ListenableFuture<Void> put(ShardResource shardResource, InstanceIdentifier<?> identifier,
231 DataObject updatedData) {
232 BlockingQueue<ActionableResource> queue = shardResource.getQueue();
234 beforeModification(shardResource.name(), identifier);
235 ActionableResource actResource = new ActionableResourceImpl(
236 identifier, ActionableResource.CREATE, updatedData, null/*oldData*/);
237 queue.add(actResource);
238 return actResource.getResultFuture();
241 .immediateFailedFuture(new IllegalStateException("Queue missing for provided shardResource "
242 + shardResource.name()));
245 public void put(String resourceType, InstanceIdentifier<?> identifier, DataObject updatedData) {
246 BlockingQueue<ActionableResource> queue = getQueue(resourceType);
248 beforeModification(resourceType, identifier);
249 ActionableResource actResource = new ActionableResourceImpl(
250 identifier, ActionableResource.CREATE, updatedData, null/*oldData*/);
251 queue.add(actResource);
255 private BlockingQueue<ActionableResource> getQueue(String resourceType) {
256 if (resourceHandlerMapper.containsKey(resourceType)) {
257 return resourceHandlerMapper.get(resourceType).getLeft();
262 public void deregisterBatchableResource(String resourceType) {
263 ScheduledExecutorService scheduledThreadPoolExecutor = resourceBatchingThreadMapper.get(resourceType);
264 if (scheduledThreadPoolExecutor != null) {
265 scheduledThreadPoolExecutor.shutdown();
267 resourceHandlerMapper.remove(resourceType);
268 resourceBatchingThreadMapper.remove(resourceType);
271 private class Batcher implements Runnable {
272 private final String resourceType;
274 Batcher(String resourceType) {
275 this.resourceType = resourceType;
280 List<ActionableResource> resList = new ArrayList<>();
283 Pair<BlockingQueue<ActionableResource>, ResourceHandler> resMapper =
284 resourceHandlerMapper.get(resourceType);
285 if (resMapper == null) {
286 LOG.error("Unable to find resourceMapper for batching the ResourceType {}", resourceType);
289 BlockingQueue<ActionableResource> resQueue = resMapper.getLeft();
290 ResourceHandler resHandler = resMapper.getRight();
291 resList.add(resQueue.take());
292 resQueue.drainTo(resList);
294 long start = System.currentTimeMillis();
295 int batchSize = resHandler.getBatchSize();
297 int batches = resList.size() / batchSize;
298 if (resList.size() > batchSize) {
299 LOG.info("Batched up resources of size {} into batches {} for resourcetype {}",
300 resList.size(), batches, resourceType);
301 for (int i = 0, j = 0; i < batches; j = j + batchSize,i++) {
302 new MdsalDsTask<>(resourceType, resList.subList(j, j + batchSize)).process();
304 // process remaining routes
305 LOG.trace("Picked up 1 size {} ", resList.subList(batches * batchSize, resList.size()).size());
306 new MdsalDsTask<>(resourceType, resList.subList(batches * batchSize, resList.size())).process();
308 // process less than OR == batchsize routes
309 LOG.trace("Picked up 2 size {}", resList.size());
310 new MdsalDsTask<>(resourceType, resList).process();
313 long timetaken = System.currentTimeMillis() - start;
314 LOG.debug("Total taken ##time = {}ms for resourceList of size {} for resourceType {}",
315 timetaken, resList.size(), resourceType);
317 } catch (InterruptedException e) {
318 LOG.error("InterruptedException during run()", e);
324 private class MdsalDsTask<T extends DataObject> {
326 List<ActionableResource> actResourceList;
328 MdsalDsTask(String resourceType, List<ActionableResource> actResourceList) {
329 this.resourceType = resourceType;
330 this.actResourceList = actResourceList;
333 @SuppressWarnings("unchecked")
334 public void process() {
335 LOG.trace("Picked up 3 size {} of resourceType {}", actResourceList.size(), resourceType);
336 Pair<BlockingQueue<ActionableResource>, ResourceHandler> resMapper =
337 resourceHandlerMapper.get(resourceType);
338 if (resMapper == null) {
339 LOG.error("Unable to find resourceMapper for batching the ResourceType {}", resourceType);
342 ResourceHandler resHandler = resMapper.getRight();
343 DataBroker broker = resHandler.getResourceBroker();
344 LogicalDatastoreType dsType = resHandler.getDatastoreType();
345 ReadWriteTransaction tx = broker.newReadWriteTransaction();
346 List<SubTransaction> transactionObjects = new ArrayList<>();
347 Map<SubTransaction, SettableFuture<Void>> txMap = new HashMap<>();
348 for (ActionableResource actResource : actResourceList) {
349 int startSize = transactionObjects.size();
350 switch (actResource.getAction()) {
351 case ActionableResource.CREATE:
352 resHandler.create(tx, dsType, actResource.getInstanceIdentifier(), actResource.getInstance(),
355 case ActionableResource.UPDATE:
356 Object updated = actResource.getInstance();
357 Object original = actResource.getOldInstance();
358 resHandler.update(tx, dsType, actResource.getInstanceIdentifier(), original,
359 updated,transactionObjects);
361 case ActionableResource.UPDATECONTAINER:
362 Object updatedContainer = actResource.getInstance();
363 Object originalContainer = actResource.getOldInstance();
364 resHandler.updateContainer(tx, dsType, actResource.getInstanceIdentifier(),
365 originalContainer, updatedContainer,transactionObjects);
367 case ActionableResource.DELETE:
368 resHandler.delete(tx, dsType, actResource.getInstanceIdentifier(), actResource.getInstance(),
371 case ActionableResource.READ:
372 ActionableReadResource<DataObject> readAction = (ActionableReadResource<DataObject>)actResource;
373 ListenableFuture<Optional<DataObject>> future =
374 tx.read(dsType, readAction.getInstanceIdentifier());
375 Futures.addCallback(future, new FutureCallback<Optional<DataObject>>() {
377 public void onSuccess(Optional<DataObject> result) {
378 readAction.getReadFuture().set(result);
382 public void onFailure(Throwable failure) {
383 readAction.getReadFuture().setException(failure);
385 }, MoreExecutors.directExecutor());
388 LOG.error("Unable to determine Action for ResourceType {} with ResourceKey {}",
389 resourceType, actResource);
391 int endSize = transactionObjects.size();
392 if (endSize > startSize) {
393 txMap.put(transactionObjects.get(endSize - 1),
394 (SettableFuture<Void>) actResource.getResultFuture());
399 long start = System.currentTimeMillis();
400 FluentFuture<? extends @NonNull CommitInfo> futures = tx.commit();
404 actResourceList.forEach(actionableResource -> {
405 ((SettableFuture<Void>) actionableResource.getResultFuture()).set(null);
406 postCommit(actionableResource.getAction(), actionableResource.getInstanceIdentifier());
408 long time = System.currentTimeMillis() - start;
409 LOG.trace("##### Time taken for {} = {}ms", actResourceList.size(), time);
411 } catch (InterruptedException | ExecutionException e) {
412 LOG.error("Exception occurred while batch writing to datastore", e);
413 LOG.info("Trying to submit transaction operations one at a time for resType {}", resourceType);
414 for (SubTransaction object : transactionObjects) {
415 WriteTransaction writeTransaction = broker.newWriteOnlyTransaction();
416 switch (object.getAction()) {
417 case SubTransaction.CREATE:
418 writeTransaction.put(dsType, object.getInstanceIdentifier(),
419 (DataObject) object.getInstance(), true);
421 case SubTransaction.DELETE:
422 writeTransaction.delete(dsType, object.getInstanceIdentifier());
424 case SubTransaction.UPDATE:
425 writeTransaction.merge(dsType, object.getInstanceIdentifier(),
426 (DataObject) object.getInstance(), true);
429 LOG.error("Unable to determine Action for transaction object with id {}",
430 object.getInstanceIdentifier());
432 FluentFuture<? extends @NonNull CommitInfo> futureOperation = writeTransaction.commit();
434 futureOperation.get();
435 if (txMap.containsKey(object)) {
436 txMap.get(object).set(null);
438 LOG.error("Subtx object {} has no Actionable-resource associated with it !! ",
439 object.getInstanceIdentifier());
441 } catch (InterruptedException | ExecutionException exception) {
442 if (txMap.containsKey(object)) {
443 txMap.get(object).setException(exception);
445 LOG.error("Error {} to datastore (path, data) : ({}, {})", object.getAction(),
446 object.getInstanceIdentifier(), object.getInstance(), exception);
448 postCommit(object.getAction(), object.getInstanceIdentifier());
454 private void postCommit(int action, InstanceIdentifier iid) {
456 case ActionableResource.CREATE:
457 case ActionableResource.UPDATE:
458 case ActionableResource.DELETE:
459 afterModification(resourceType, iid);
467 private static class ActionableReadResource<T extends DataObject> extends ActionableResourceImpl {
468 private final SettableFuture<Optional<T>> readFuture;
470 ActionableReadResource(InstanceIdentifier<T> identifier, SettableFuture<Optional<T>> readFuture) {
471 super(identifier, ActionableResource.READ, null, null);
472 this.readFuture = readFuture;
475 SettableFuture<Optional<T>> getReadFuture() {