2 * Copyright (c) 2015 - 2018 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.genius.utils.batching;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.util.concurrent.FluentFuture;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import com.google.common.util.concurrent.SettableFuture;
18 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
19 import java.util.ArrayList;
20 import java.util.HashMap;
21 import java.util.List;
23 import java.util.Optional;
25 import java.util.concurrent.BlockingQueue;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.LinkedBlockingQueue;
29 import java.util.concurrent.ScheduledExecutorService;
30 import java.util.concurrent.TimeUnit;
31 import org.apache.commons.lang3.tuple.ImmutablePair;
32 import org.apache.commons.lang3.tuple.Pair;
33 import org.eclipse.jdt.annotation.NonNull;
34 import org.opendaylight.infrautils.utils.concurrent.Executors;
35 import org.opendaylight.mdsal.binding.api.DataBroker;
36 import org.opendaylight.mdsal.binding.api.ReadTransaction;
37 import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
38 import org.opendaylight.mdsal.binding.api.WriteTransaction;
39 import org.opendaylight.mdsal.common.api.CommitInfo;
40 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
41 import org.opendaylight.mdsal.common.api.ReadFailedException;
42 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
43 import org.opendaylight.yangtools.yang.binding.DataObject;
44 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
49 * This class lets other modules submit their CRUD methods to it. This class
50 * will then supply a single transaction to such CRUD methods of the
51 * subscribers, on which such subscribers write data to that transaction.
52 * Finally the framework attempts to reliably write this single transaction
53 * which represents a batch of an ordered list of entities owned by that subscriber,
54 * to be written/updated/removed from a specific datastore as registered by the subscriber.
56 public class ResourceBatchingManager implements AutoCloseable {
58 private static final Logger LOG = LoggerFactory.getLogger(ResourceBatchingManager.class);
60 private static final int INITIAL_DELAY = 3000;
61 private static final TimeUnit TIME_UNIT = TimeUnit.MILLISECONDS;
63 private static final int PERIODICITY_IN_MS = 500;
64 private static final int BATCH_SIZE = 1000;
66 public enum ShardResource {
67 CONFIG_TOPOLOGY(LogicalDatastoreType.CONFIGURATION),
68 OPERATIONAL_TOPOLOGY(LogicalDatastoreType.OPERATIONAL),
69 CONFIG_INVENTORY(LogicalDatastoreType.CONFIGURATION),
70 OPERATIONAL_INVENTORY(LogicalDatastoreType.OPERATIONAL);
72 BlockingQueue<ActionableResource<?>> queue = new LinkedBlockingQueue<>();
73 LogicalDatastoreType datastoreType;
75 ShardResource(LogicalDatastoreType datastoreType) {
76 this.datastoreType = datastoreType;
79 public LogicalDatastoreType getDatastoreType() {
83 BlockingQueue<ActionableResource<?>> getQueue() {
88 private final ConcurrentHashMap<String, Pair<BlockingQueue<ActionableResource<?>>, ResourceHandler>>
89 resourceHandlerMapper = new ConcurrentHashMap<>();
91 private final ConcurrentHashMap<String, ScheduledExecutorService>
92 resourceBatchingThreadMapper = new ConcurrentHashMap<>();
94 private final Map<String, Set<InstanceIdentifier<?>>> pendingModificationByResourceType = new ConcurrentHashMap<>();
96 private static ResourceBatchingManager instance;
99 instance = new ResourceBatchingManager();
102 public static ResourceBatchingManager getInstance() {
107 public void close() {
108 LOG.trace("ResourceBatchingManager Closed, closing all batched resources");
109 resourceBatchingThreadMapper.values().forEach(ScheduledExecutorService::shutdown);
112 public void registerBatchableResource(final String resourceType,
113 final BlockingQueue<ActionableResource<?>> resQueue, final ResourceHandler resHandler) {
114 requireNonNull(resQueue, "ResourceQueue to use for batching cannot not be null.");
115 requireNonNull(resHandler, "ResourceHandler cannot not be null.");
117 resourceHandlerMapper.put(resourceType, new ImmutablePair<>(resQueue, resHandler));
118 ScheduledExecutorService resDelegatorService =
119 Executors.newListeningScheduledThreadPool(1, "ResourceBatchingManager", LOG);
120 resourceBatchingThreadMapper.put(resourceType, resDelegatorService);
121 LOG.info("Registered resourceType {} with batchSize {} and batchInterval {}", resourceType,
122 resHandler.getBatchSize(), resHandler.getBatchInterval());
123 resDelegatorService.scheduleWithFixedDelay(
124 new Batcher(resourceType), resHandler.getBatchInterval(), resHandler.getBatchInterval(), TIME_UNIT);
125 pendingModificationByResourceType.putIfAbsent(resourceType, ConcurrentHashMap.newKeySet());
128 public void registerDefaultBatchHandlers(DataBroker broker) {
129 LOG.trace("Registering default batch handlers");
130 Integer batchSize = Integer.getInteger("resource.manager.batch.size", BATCH_SIZE);
131 Integer batchInterval = Integer.getInteger("resource.manager.batch.periodicity.ms", PERIODICITY_IN_MS);
133 for (ShardResource shardResource : ShardResource.values()) {
134 if (resourceHandlerMapper.containsKey(shardResource.name())) {
137 DefaultBatchHandler batchHandler = new DefaultBatchHandler(broker, shardResource.datastoreType, batchSize,
139 registerBatchableResource(shardResource.name(), shardResource.getQueue(), batchHandler);
143 private void beforeModification(String resoureType, InstanceIdentifier<?> iid) {
144 pendingModificationByResourceType.get(resoureType).add(iid);
147 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
148 justification = "https://github.com/spotbugs/spotbugs/issues/811")
149 private void afterModification(String resoureType, InstanceIdentifier<?> iid) {
150 pendingModificationByResourceType.get(resoureType).remove(iid);
154 * Reads the identifier of the given resource type.
155 * Not to be used by the applications which uses their own resource queue
157 * @param resourceType resource type that was registered with batch manager
158 * @param identifier identifier to be read
159 * @param <T> DataObject subclass
160 * @return a CheckFuture containing the result of the read
162 public <T extends DataObject> FluentFuture<Optional<T>> read(
163 String resourceType, InstanceIdentifier<T> identifier) throws InterruptedException, ExecutionException {
164 BlockingQueue<ActionableResource<?>> queue = getQueue(resourceType);
166 if (pendingModificationByResourceType.get(resourceType).contains(identifier)) {
167 SettableFuture<Optional<T>> readFuture = SettableFuture.create();
168 queue.add(new ActionableReadResource<>(identifier, readFuture));
169 return FluentFutures.immediateFluentFuture(readFuture.get());
171 ResourceHandler resourceHandler = resourceHandlerMapper.get(resourceType).getRight();
172 try (ReadTransaction tx = resourceHandler.getResourceBroker().newReadOnlyTransaction()) {
173 return tx.read(resourceHandler.getDatastoreType(), identifier);
178 return FluentFutures.immediateFailedFluentFuture(new ReadFailedException(
179 "No batch handler was registered for resource " + resourceType));
182 public ListenableFuture<Void> merge(ShardResource shardResource, InstanceIdentifier<?> identifier,
183 DataObject updatedData) {
184 BlockingQueue<ActionableResource<?>> queue = shardResource.getQueue();
186 beforeModification(shardResource.name(), identifier);
187 ActionableResource<?> actResource = new ActionableResourceImpl<>(
188 identifier, ActionableResource.UPDATE, updatedData, null/*oldData*/);
189 queue.add(actResource);
190 return actResource.getResultFuture();
193 .immediateFailedFuture(new IllegalStateException("Queue missing for provided shardResource "
194 + shardResource.name()));
197 public void merge(String resourceType, InstanceIdentifier<?> identifier, DataObject updatedData) {
198 BlockingQueue<ActionableResource<?>> queue = getQueue(resourceType);
200 beforeModification(resourceType, identifier);
201 ActionableResource<?> actResource = new ActionableResourceImpl<>(
202 identifier, ActionableResource.UPDATE, updatedData, null/*oldData*/);
203 queue.add(actResource);
207 public ListenableFuture<Void> delete(ShardResource shardResource, InstanceIdentifier<?> identifier) {
208 BlockingQueue<ActionableResource<?>> queue = shardResource.getQueue();
210 beforeModification(shardResource.name(), identifier);
211 ActionableResource actResource = new ActionableResourceImpl<>(
212 identifier, ActionableResource.DELETE, null, null/*oldData*/);
213 queue.add(actResource);
214 return actResource.getResultFuture();
217 .immediateFailedFuture(new IllegalStateException("Queue missing for provided shardResource "
218 + shardResource.name()));
221 public void delete(String resourceType, InstanceIdentifier<?> identifier) {
222 BlockingQueue<ActionableResource<?>> queue = getQueue(resourceType);
224 beforeModification(resourceType, identifier);
225 ActionableResource<?> actResource = new ActionableResourceImpl<>(
226 identifier, ActionableResource.DELETE, null, null/*oldData*/);
227 queue.add(actResource);
231 public ListenableFuture<Void> put(ShardResource shardResource, InstanceIdentifier<?> identifier,
232 DataObject updatedData) {
233 BlockingQueue<ActionableResource<?>> queue = shardResource.getQueue();
235 beforeModification(shardResource.name(), identifier);
236 ActionableResource<?> actResource = new ActionableResourceImpl<>(
237 identifier, ActionableResource.CREATE, updatedData, null/*oldData*/);
238 queue.add(actResource);
239 return actResource.getResultFuture();
242 .immediateFailedFuture(new IllegalStateException("Queue missing for provided shardResource "
243 + shardResource.name()));
246 public void put(String resourceType, InstanceIdentifier<?> identifier, DataObject updatedData) {
247 BlockingQueue<ActionableResource<?>> queue = getQueue(resourceType);
249 beforeModification(resourceType, identifier);
250 ActionableResource<?> actResource = new ActionableResourceImpl<>(
251 identifier, ActionableResource.CREATE, updatedData, null/*oldData*/);
252 queue.add(actResource);
256 private BlockingQueue<ActionableResource<?>> getQueue(String resourceType) {
257 if (resourceHandlerMapper.containsKey(resourceType)) {
258 return resourceHandlerMapper.get(resourceType).getLeft();
263 public void deregisterBatchableResource(String resourceType) {
264 ScheduledExecutorService scheduledThreadPoolExecutor = resourceBatchingThreadMapper.get(resourceType);
265 if (scheduledThreadPoolExecutor != null) {
266 scheduledThreadPoolExecutor.shutdown();
268 resourceHandlerMapper.remove(resourceType);
269 resourceBatchingThreadMapper.remove(resourceType);
272 private class Batcher implements Runnable {
273 private final String resourceType;
275 Batcher(String resourceType) {
276 this.resourceType = resourceType;
281 List<ActionableResource<?>> resList = new ArrayList<>();
284 Pair<BlockingQueue<ActionableResource<?>>, ResourceHandler> resMapper =
285 resourceHandlerMapper.get(resourceType);
286 if (resMapper == null) {
287 LOG.error("Unable to find resourceMapper for batching the ResourceType {}", resourceType);
290 BlockingQueue<ActionableResource<?>> resQueue = resMapper.getLeft();
291 ResourceHandler resHandler = resMapper.getRight();
292 resList.add(resQueue.take());
293 resQueue.drainTo(resList);
295 long start = System.currentTimeMillis();
296 int batchSize = resHandler.getBatchSize();
298 int batches = resList.size() / batchSize;
299 if (resList.size() > batchSize) {
300 LOG.info("Batched up resources of size {} into batches {} for resourcetype {}",
301 resList.size(), batches, resourceType);
302 for (int i = 0, j = 0; i < batches; j = j + batchSize,i++) {
303 new MdsalDsTask<>(resourceType, resList.subList(j, j + batchSize)).process();
305 // process remaining routes
306 LOG.trace("Picked up 1 size {} ", resList.subList(batches * batchSize, resList.size()).size());
307 new MdsalDsTask<>(resourceType, resList.subList(batches * batchSize, resList.size())).process();
309 // process less than OR == batchsize routes
310 LOG.trace("Picked up 2 size {}", resList.size());
311 new MdsalDsTask<>(resourceType, resList).process();
314 long timetaken = System.currentTimeMillis() - start;
315 LOG.debug("Total taken ##time = {}ms for resourceList of size {} for resourceType {}",
316 timetaken, resList.size(), resourceType);
318 } catch (InterruptedException e) {
319 LOG.error("InterruptedException during run()", e);
325 private class MdsalDsTask<T extends DataObject> {
327 List<ActionableResource<?>> actResourceList;
329 MdsalDsTask(String resourceType, List<ActionableResource<?>> actResourceList) {
330 this.resourceType = resourceType;
331 this.actResourceList = actResourceList;
334 @SuppressWarnings("unchecked")
335 public void process() {
336 LOG.trace("Picked up 3 size {} of resourceType {}", actResourceList.size(), resourceType);
337 Pair<BlockingQueue<ActionableResource<?>>, ResourceHandler> resMapper =
338 resourceHandlerMapper.get(resourceType);
339 if (resMapper == null) {
340 LOG.error("Unable to find resourceMapper for batching the ResourceType {}", resourceType);
343 ResourceHandler resHandler = resMapper.getRight();
344 DataBroker broker = resHandler.getResourceBroker();
345 LogicalDatastoreType dsType = resHandler.getDatastoreType();
346 ReadWriteTransaction tx = broker.newReadWriteTransaction();
347 List<SubTransaction> transactionObjects = new ArrayList<>();
348 Map<SubTransaction, SettableFuture<Void>> txMap = new HashMap<>();
349 for (ActionableResource<?> actResource : actResourceList) {
350 int startSize = transactionObjects.size();
351 switch (actResource.getAction()) {
352 case ActionableResource.CREATE:
353 resHandler.create(tx, dsType, actResource.getInstanceIdentifier(), actResource.getInstance(),
356 case ActionableResource.UPDATE:
357 Object updated = actResource.getInstance();
358 Object original = actResource.getOldInstance();
359 resHandler.update(tx, dsType, actResource.getInstanceIdentifier(), original,
360 updated,transactionObjects);
362 case ActionableResource.UPDATECONTAINER:
363 Object updatedContainer = actResource.getInstance();
364 Object originalContainer = actResource.getOldInstance();
365 resHandler.updateContainer(tx, dsType, actResource.getInstanceIdentifier(),
366 originalContainer, updatedContainer,transactionObjects);
368 case ActionableResource.DELETE:
369 resHandler.delete(tx, dsType, actResource.getInstanceIdentifier(), actResource.getInstance(),
372 case ActionableResource.READ:
373 ActionableReadResource<DataObject> readAction = (ActionableReadResource<DataObject>)actResource;
374 ListenableFuture<Optional<DataObject>> future =
375 tx.read(dsType, readAction.getInstanceIdentifier());
376 Futures.addCallback(future, new FutureCallback<Optional<DataObject>>() {
378 public void onSuccess(Optional<DataObject> result) {
379 readAction.getReadFuture().set(result);
383 public void onFailure(Throwable failure) {
384 readAction.getReadFuture().setException(failure);
386 }, MoreExecutors.directExecutor());
389 LOG.error("Unable to determine Action for ResourceType {} with ResourceKey {}",
390 resourceType, actResource);
392 int endSize = transactionObjects.size();
393 if (endSize > startSize) {
394 txMap.put(transactionObjects.get(endSize - 1),
395 (SettableFuture<Void>) actResource.getResultFuture());
400 long start = System.currentTimeMillis();
401 FluentFuture<? extends @NonNull CommitInfo> futures = tx.commit();
405 actResourceList.forEach(actionableResource -> {
406 ((SettableFuture<Void>) actionableResource.getResultFuture()).set(null);
407 postCommit(actionableResource.getAction(), actionableResource.getInstanceIdentifier());
409 long time = System.currentTimeMillis() - start;
410 LOG.trace("##### Time taken for {} = {}ms", actResourceList.size(), time);
412 } catch (InterruptedException | ExecutionException e) {
413 LOG.error("Exception occurred while batch writing to datastore", e);
414 LOG.info("Trying to submit transaction operations one at a time for resType {}", resourceType);
415 for (SubTransaction object : transactionObjects) {
416 WriteTransaction writeTransaction = broker.newWriteOnlyTransaction();
417 switch (object.getAction()) {
418 case SubTransaction.CREATE:
419 writeTransaction.mergeParentStructurePut(dsType, object.getInstanceIdentifier(),
420 (DataObject) object.getInstance());
422 case SubTransaction.DELETE:
423 writeTransaction.delete(dsType, object.getInstanceIdentifier());
425 case SubTransaction.UPDATE:
426 writeTransaction.mergeParentStructureMerge(dsType, object.getInstanceIdentifier(),
427 (DataObject) object.getInstance());
430 LOG.error("Unable to determine Action for transaction object with id {}",
431 object.getInstanceIdentifier());
433 FluentFuture<? extends @NonNull CommitInfo> futureOperation = writeTransaction.commit();
435 futureOperation.get();
436 if (txMap.containsKey(object)) {
437 txMap.get(object).set(null);
439 LOG.error("Subtx object {} has no Actionable-resource associated with it !! ",
440 object.getInstanceIdentifier());
442 } catch (InterruptedException | ExecutionException exception) {
443 if (txMap.containsKey(object)) {
444 txMap.get(object).setException(exception);
446 LOG.error("Error {} to datastore (path, data) : ({}, {})", object.getAction(),
447 object.getInstanceIdentifier(), object.getInstance(), exception);
449 postCommit(object.getAction(), object.getInstanceIdentifier());
455 private void postCommit(int action, InstanceIdentifier iid) {
457 case ActionableResource.CREATE:
458 case ActionableResource.UPDATE:
459 case ActionableResource.DELETE:
460 afterModification(resourceType, iid);
468 private static class ActionableReadResource<T extends DataObject> extends ActionableResourceImpl<T> {
469 private final SettableFuture<Optional<T>> readFuture;
471 ActionableReadResource(InstanceIdentifier<T> identifier, SettableFuture<Optional<T>> readFuture) {
472 super(identifier, ActionableResource.READ, null, null);
473 this.readFuture = readFuture;
476 SettableFuture<Optional<T>> getReadFuture() {