Bump upstreams for Silicon
[genius.git] / mdsalutil / mdsalutil-api / src / main / java / org / opendaylight / genius / utils / batching / ResourceBatchingManager.java
1 /*
2  * Copyright (c) 2015 - 2018 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.genius.utils.batching;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.util.concurrent.FluentFuture;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import com.google.common.util.concurrent.SettableFuture;
18 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
19 import java.util.ArrayList;
20 import java.util.HashMap;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.Optional;
24 import java.util.Set;
25 import java.util.concurrent.BlockingQueue;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.LinkedBlockingQueue;
29 import java.util.concurrent.ScheduledExecutorService;
30 import java.util.concurrent.TimeUnit;
31 import org.apache.commons.lang3.tuple.ImmutablePair;
32 import org.apache.commons.lang3.tuple.Pair;
33 import org.eclipse.jdt.annotation.NonNull;
34 import org.opendaylight.infrautils.utils.concurrent.Executors;
35 import org.opendaylight.mdsal.binding.api.DataBroker;
36 import org.opendaylight.mdsal.binding.api.ReadTransaction;
37 import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
38 import org.opendaylight.mdsal.binding.api.WriteTransaction;
39 import org.opendaylight.mdsal.common.api.CommitInfo;
40 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
41 import org.opendaylight.mdsal.common.api.ReadFailedException;
42 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
43 import org.opendaylight.yangtools.yang.binding.DataObject;
44 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 /**
49  * This class lets other modules submit their CRUD methods to it. This class
50  * will then supply a single transaction to such CRUD methods of the
51  * subscribers, on which such subscribers write data to that transaction.
52  * Finally the framework attempts to reliably write this single transaction
53  * which represents a batch of an ordered list of entities owned by that subscriber,
54  * to be written/updated/removed from a specific datastore as registered by the subscriber.
55  */
56 public class ResourceBatchingManager implements AutoCloseable {
57
58     private static final Logger LOG = LoggerFactory.getLogger(ResourceBatchingManager.class);
59
60     private static final int INITIAL_DELAY = 3000;
61     private static final TimeUnit TIME_UNIT = TimeUnit.MILLISECONDS;
62
63     private static final int PERIODICITY_IN_MS = 500;
64     private static final int BATCH_SIZE = 1000;
65
66     public enum ShardResource {
67         CONFIG_TOPOLOGY(LogicalDatastoreType.CONFIGURATION),
68         OPERATIONAL_TOPOLOGY(LogicalDatastoreType.OPERATIONAL),
69         CONFIG_INVENTORY(LogicalDatastoreType.CONFIGURATION),
70         OPERATIONAL_INVENTORY(LogicalDatastoreType.OPERATIONAL);
71
72         BlockingQueue<ActionableResource<?>> queue = new LinkedBlockingQueue<>();
73         LogicalDatastoreType datastoreType;
74
75         ShardResource(LogicalDatastoreType datastoreType) {
76             this.datastoreType = datastoreType;
77         }
78
79         public LogicalDatastoreType getDatastoreType() {
80             return datastoreType;
81         }
82
83         BlockingQueue<ActionableResource<?>> getQueue() {
84             return queue;
85         }
86     }
87
88     private final ConcurrentHashMap<String, Pair<BlockingQueue<ActionableResource<?>>, ResourceHandler>>
89             resourceHandlerMapper = new ConcurrentHashMap<>();
90
91     private final ConcurrentHashMap<String, ScheduledExecutorService>
92             resourceBatchingThreadMapper = new ConcurrentHashMap<>();
93
94     private final Map<String, Set<InstanceIdentifier<?>>> pendingModificationByResourceType = new ConcurrentHashMap<>();
95
96     private static ResourceBatchingManager instance;
97
98     static {
99         instance = new ResourceBatchingManager();
100     }
101
102     public static ResourceBatchingManager getInstance() {
103         return instance;
104     }
105
106     @Override
107     public void close() {
108         LOG.trace("ResourceBatchingManager Closed, closing all batched resources");
109         resourceBatchingThreadMapper.values().forEach(ScheduledExecutorService::shutdown);
110     }
111
112     public void registerBatchableResource(final String resourceType,
113             final BlockingQueue<ActionableResource<?>> resQueue, final ResourceHandler resHandler) {
114         requireNonNull(resQueue, "ResourceQueue to use for batching cannot not be null.");
115         requireNonNull(resHandler, "ResourceHandler cannot not be null.");
116
117         resourceHandlerMapper.put(resourceType, new ImmutablePair<>(resQueue, resHandler));
118         ScheduledExecutorService resDelegatorService =
119                 Executors.newListeningScheduledThreadPool(1, "ResourceBatchingManager", LOG);
120         resourceBatchingThreadMapper.put(resourceType, resDelegatorService);
121         LOG.info("Registered resourceType {} with batchSize {} and batchInterval {}", resourceType,
122                 resHandler.getBatchSize(), resHandler.getBatchInterval());
123         resDelegatorService.scheduleWithFixedDelay(
124                 new Batcher(resourceType), resHandler.getBatchInterval(), resHandler.getBatchInterval(), TIME_UNIT);
125         pendingModificationByResourceType.putIfAbsent(resourceType, ConcurrentHashMap.newKeySet());
126     }
127
128     public void registerDefaultBatchHandlers(DataBroker broker) {
129         LOG.trace("Registering default batch handlers");
130         Integer batchSize = Integer.getInteger("resource.manager.batch.size", BATCH_SIZE);
131         Integer batchInterval = Integer.getInteger("resource.manager.batch.periodicity.ms", PERIODICITY_IN_MS);
132
133         for (ShardResource shardResource : ShardResource.values()) {
134             if (resourceHandlerMapper.containsKey(shardResource.name())) {
135                 continue;
136             }
137             DefaultBatchHandler batchHandler = new DefaultBatchHandler(broker, shardResource.datastoreType, batchSize,
138                     batchInterval);
139             registerBatchableResource(shardResource.name(), shardResource.getQueue(), batchHandler);
140         }
141     }
142
143     private void beforeModification(String resoureType, InstanceIdentifier<?> iid) {
144         pendingModificationByResourceType.get(resoureType).add(iid);
145     }
146
147     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
148         justification = "https://github.com/spotbugs/spotbugs/issues/811")
149     private void afterModification(String resoureType, InstanceIdentifier<?> iid) {
150         pendingModificationByResourceType.get(resoureType).remove(iid);
151     }
152
153     /**
154      * Reads the identifier of the given resource type.
155      * Not to be used by the applications  which uses their own resource queue
156      *
157      * @param resourceType resource type that was registered with batch manager
158      * @param identifier   identifier to be read
159      * @param <T> DataObject subclass
160      * @return a CheckFuture containing the result of the read
161      */
162     public <T extends DataObject> FluentFuture<Optional<T>> read(
163             String resourceType, InstanceIdentifier<T> identifier) throws InterruptedException, ExecutionException {
164         BlockingQueue<ActionableResource<?>> queue = getQueue(resourceType);
165         if (queue != null) {
166             if (pendingModificationByResourceType.get(resourceType).contains(identifier)) {
167                 SettableFuture<Optional<T>> readFuture = SettableFuture.create();
168                 queue.add(new ActionableReadResource<>(identifier, readFuture));
169                 return FluentFutures.immediateFluentFuture(readFuture.get());
170             } else {
171                 ResourceHandler resourceHandler = resourceHandlerMapper.get(resourceType).getRight();
172                 try (ReadTransaction tx = resourceHandler.getResourceBroker().newReadOnlyTransaction()) {
173                     return tx.read(resourceHandler.getDatastoreType(), identifier);
174                 }
175             }
176         }
177
178         return FluentFutures.immediateFailedFluentFuture(new ReadFailedException(
179                 "No batch handler was registered for resource " + resourceType));
180     }
181
182     public ListenableFuture<Void> merge(ShardResource shardResource, InstanceIdentifier<?> identifier,
183                                         DataObject updatedData) {
184         BlockingQueue<ActionableResource<?>> queue = shardResource.getQueue();
185         if (queue != null) {
186             beforeModification(shardResource.name(), identifier);
187             ActionableResource<?> actResource = new ActionableResourceImpl<>(
188                     identifier, ActionableResource.UPDATE, updatedData, null/*oldData*/);
189             queue.add(actResource);
190             return actResource.getResultFuture();
191         }
192         return Futures
193                 .immediateFailedFuture(new IllegalStateException("Queue missing for provided shardResource "
194                         + shardResource.name()));
195     }
196
197     public void merge(String resourceType, InstanceIdentifier<?> identifier, DataObject updatedData) {
198         BlockingQueue<ActionableResource<?>> queue = getQueue(resourceType);
199         if (queue != null) {
200             beforeModification(resourceType, identifier);
201             ActionableResource<?> actResource = new ActionableResourceImpl<>(
202                     identifier, ActionableResource.UPDATE, updatedData, null/*oldData*/);
203             queue.add(actResource);
204         }
205     }
206
207     public ListenableFuture<Void> delete(ShardResource shardResource, InstanceIdentifier<?> identifier) {
208         BlockingQueue<ActionableResource<?>> queue = shardResource.getQueue();
209         if (queue != null) {
210             beforeModification(shardResource.name(), identifier);
211             ActionableResource actResource = new ActionableResourceImpl<>(
212                     identifier, ActionableResource.DELETE, null, null/*oldData*/);
213             queue.add(actResource);
214             return actResource.getResultFuture();
215         }
216         return Futures
217                 .immediateFailedFuture(new IllegalStateException("Queue missing for provided shardResource "
218                         + shardResource.name()));
219     }
220
221     public void delete(String resourceType, InstanceIdentifier<?> identifier) {
222         BlockingQueue<ActionableResource<?>> queue = getQueue(resourceType);
223         if (queue != null) {
224             beforeModification(resourceType, identifier);
225             ActionableResource<?> actResource = new ActionableResourceImpl<>(
226                     identifier, ActionableResource.DELETE, null, null/*oldData*/);
227             queue.add(actResource);
228         }
229     }
230
231     public ListenableFuture<Void> put(ShardResource shardResource, InstanceIdentifier<?> identifier,
232                                       DataObject updatedData) {
233         BlockingQueue<ActionableResource<?>> queue = shardResource.getQueue();
234         if (queue != null) {
235             beforeModification(shardResource.name(), identifier);
236             ActionableResource<?> actResource = new ActionableResourceImpl<>(
237                     identifier, ActionableResource.CREATE, updatedData, null/*oldData*/);
238             queue.add(actResource);
239             return actResource.getResultFuture();
240         }
241         return Futures
242                 .immediateFailedFuture(new IllegalStateException("Queue missing for provided shardResource "
243                         + shardResource.name()));
244     }
245
246     public void put(String resourceType, InstanceIdentifier<?> identifier, DataObject updatedData) {
247         BlockingQueue<ActionableResource<?>> queue = getQueue(resourceType);
248         if (queue != null) {
249             beforeModification(resourceType, identifier);
250             ActionableResource<?> actResource = new ActionableResourceImpl<>(
251                     identifier, ActionableResource.CREATE, updatedData, null/*oldData*/);
252             queue.add(actResource);
253         }
254     }
255
256     private BlockingQueue<ActionableResource<?>> getQueue(String resourceType) {
257         if (resourceHandlerMapper.containsKey(resourceType)) {
258             return resourceHandlerMapper.get(resourceType).getLeft();
259         }
260         return null;
261     }
262
263     public void deregisterBatchableResource(String resourceType) {
264         ScheduledExecutorService scheduledThreadPoolExecutor = resourceBatchingThreadMapper.get(resourceType);
265         if (scheduledThreadPoolExecutor != null) {
266             scheduledThreadPoolExecutor.shutdown();
267         }
268         resourceHandlerMapper.remove(resourceType);
269         resourceBatchingThreadMapper.remove(resourceType);
270     }
271
272     private class Batcher implements Runnable {
273         private final String resourceType;
274
275         Batcher(String resourceType) {
276             this.resourceType = resourceType;
277         }
278
279         @Override
280         public void run() {
281             List<ActionableResource<?>> resList = new ArrayList<>();
282
283             try {
284                 Pair<BlockingQueue<ActionableResource<?>>, ResourceHandler> resMapper =
285                         resourceHandlerMapper.get(resourceType);
286                 if (resMapper == null) {
287                     LOG.error("Unable to find resourceMapper for batching the ResourceType {}", resourceType);
288                     return;
289                 }
290                 BlockingQueue<ActionableResource<?>> resQueue = resMapper.getLeft();
291                 ResourceHandler resHandler = resMapper.getRight();
292                 resList.add(resQueue.take());
293                 resQueue.drainTo(resList);
294
295                 long start = System.currentTimeMillis();
296                 int batchSize = resHandler.getBatchSize();
297
298                 int batches = resList.size() / batchSize;
299                 if (resList.size() > batchSize) {
300                     LOG.info("Batched up resources of size {} into batches {} for resourcetype {}",
301                             resList.size(), batches, resourceType);
302                     for (int i = 0, j = 0; i < batches; j = j + batchSize,i++) {
303                         new MdsalDsTask<>(resourceType, resList.subList(j, j + batchSize)).process();
304                     }
305                     // process remaining routes
306                     LOG.trace("Picked up 1 size {} ", resList.subList(batches * batchSize, resList.size()).size());
307                     new MdsalDsTask<>(resourceType, resList.subList(batches * batchSize, resList.size())).process();
308                 } else {
309                     // process less than OR == batchsize routes
310                     LOG.trace("Picked up 2 size {}", resList.size());
311                     new MdsalDsTask<>(resourceType, resList).process();
312                 }
313
314                 long timetaken = System.currentTimeMillis() - start;
315                 LOG.debug("Total taken ##time = {}ms for resourceList of size {} for resourceType {}",
316                         timetaken, resList.size(), resourceType);
317
318             } catch (InterruptedException e) {
319                 LOG.error("InterruptedException during run()", e);
320             }
321
322         }
323     }
324
325     private class MdsalDsTask<T extends DataObject> {
326         String resourceType;
327         List<ActionableResource<?>> actResourceList;
328
329         MdsalDsTask(String resourceType, List<ActionableResource<?>> actResourceList) {
330             this.resourceType = resourceType;
331             this.actResourceList = actResourceList;
332         }
333
334         @SuppressWarnings("unchecked")
335         public void process() {
336             LOG.trace("Picked up 3 size {} of resourceType {}", actResourceList.size(), resourceType);
337             Pair<BlockingQueue<ActionableResource<?>>, ResourceHandler> resMapper =
338                     resourceHandlerMapper.get(resourceType);
339             if (resMapper == null) {
340                 LOG.error("Unable to find resourceMapper for batching the ResourceType {}", resourceType);
341                 return;
342             }
343             ResourceHandler resHandler = resMapper.getRight();
344             DataBroker broker = resHandler.getResourceBroker();
345             LogicalDatastoreType dsType = resHandler.getDatastoreType();
346             ReadWriteTransaction tx = broker.newReadWriteTransaction();
347             List<SubTransaction> transactionObjects = new ArrayList<>();
348             Map<SubTransaction, SettableFuture<Void>> txMap = new HashMap<>();
349             for (ActionableResource<?> actResource : actResourceList) {
350                 int startSize = transactionObjects.size();
351                 switch (actResource.getAction()) {
352                     case ActionableResource.CREATE:
353                         resHandler.create(tx, dsType, actResource.getInstanceIdentifier(), actResource.getInstance(),
354                                 transactionObjects);
355                         break;
356                     case ActionableResource.UPDATE:
357                         Object updated = actResource.getInstance();
358                         Object original = actResource.getOldInstance();
359                         resHandler.update(tx, dsType, actResource.getInstanceIdentifier(), original,
360                                 updated,transactionObjects);
361                         break;
362                     case ActionableResource.UPDATECONTAINER:
363                         Object updatedContainer = actResource.getInstance();
364                         Object originalContainer = actResource.getOldInstance();
365                         resHandler.updateContainer(tx, dsType, actResource.getInstanceIdentifier(),
366                                 originalContainer, updatedContainer,transactionObjects);
367                         break;
368                     case ActionableResource.DELETE:
369                         resHandler.delete(tx, dsType, actResource.getInstanceIdentifier(), actResource.getInstance(),
370                                 transactionObjects);
371                         break;
372                     case ActionableResource.READ:
373                         ActionableReadResource<DataObject> readAction = (ActionableReadResource<DataObject>)actResource;
374                         ListenableFuture<Optional<DataObject>> future =
375                                 tx.read(dsType, readAction.getInstanceIdentifier());
376                         Futures.addCallback(future, new FutureCallback<Optional<DataObject>>() {
377                             @Override
378                             public void onSuccess(Optional<DataObject> result) {
379                                 readAction.getReadFuture().set(result);
380                             }
381
382                             @Override
383                             public void onFailure(Throwable failure) {
384                                 readAction.getReadFuture().setException(failure);
385                             }
386                         }, MoreExecutors.directExecutor());
387                         break;
388                     default:
389                         LOG.error("Unable to determine Action for ResourceType {} with ResourceKey {}",
390                                 resourceType, actResource);
391                 }
392                 int endSize = transactionObjects.size();
393                 if (endSize > startSize) {
394                     txMap.put(transactionObjects.get(endSize - 1),
395                             (SettableFuture<Void>) actResource.getResultFuture());
396                 }
397             }
398
399
400             long start = System.currentTimeMillis();
401             FluentFuture<? extends @NonNull CommitInfo> futures = tx.commit();
402
403             try {
404                 futures.get();
405                 actResourceList.forEach(actionableResource -> {
406                     ((SettableFuture<Void>) actionableResource.getResultFuture()).set(null);
407                     postCommit(actionableResource.getAction(), actionableResource.getInstanceIdentifier());
408                 });
409                 long time = System.currentTimeMillis() - start;
410                 LOG.trace("##### Time taken for {} = {}ms", actResourceList.size(), time);
411
412             } catch (InterruptedException | ExecutionException e) {
413                 LOG.error("Exception occurred while batch writing to datastore", e);
414                 LOG.info("Trying to submit transaction operations one at a time for resType {}", resourceType);
415                 for (SubTransaction object : transactionObjects) {
416                     WriteTransaction writeTransaction = broker.newWriteOnlyTransaction();
417                     switch (object.getAction()) {
418                         case SubTransaction.CREATE:
419                             writeTransaction.mergeParentStructurePut(dsType, object.getInstanceIdentifier(),
420                                     (DataObject) object.getInstance());
421                             break;
422                         case SubTransaction.DELETE:
423                             writeTransaction.delete(dsType, object.getInstanceIdentifier());
424                             break;
425                         case SubTransaction.UPDATE:
426                             writeTransaction.mergeParentStructureMerge(dsType, object.getInstanceIdentifier(),
427                                     (DataObject) object.getInstance());
428                             break;
429                         default:
430                             LOG.error("Unable to determine Action for transaction object with id {}",
431                                     object.getInstanceIdentifier());
432                     }
433                     FluentFuture<? extends @NonNull CommitInfo> futureOperation = writeTransaction.commit();
434                     try {
435                         futureOperation.get();
436                         if (txMap.containsKey(object)) {
437                             txMap.get(object).set(null);
438                         } else {
439                             LOG.error("Subtx object {} has no Actionable-resource associated with it !! ",
440                                     object.getInstanceIdentifier());
441                         }
442                     } catch (InterruptedException | ExecutionException exception) {
443                         if (txMap.containsKey(object)) {
444                             txMap.get(object).setException(exception);
445                         }
446                         LOG.error("Error {} to datastore (path, data) : ({}, {})", object.getAction(),
447                                 object.getInstanceIdentifier(), object.getInstance(), exception);
448                     } finally {
449                         postCommit(object.getAction(), object.getInstanceIdentifier());
450                     }
451                 }
452             }
453         }
454
455         private void postCommit(int action, InstanceIdentifier iid) {
456             switch (action) {
457                 case ActionableResource.CREATE:
458                 case ActionableResource.UPDATE:
459                 case ActionableResource.DELETE:
460                     afterModification(resourceType, iid);
461                     break;
462                 default:
463                     break;
464             }
465         }
466     }
467
468     private static class ActionableReadResource<T extends DataObject> extends ActionableResourceImpl<T> {
469         private final SettableFuture<Optional<T>> readFuture;
470
471         ActionableReadResource(InstanceIdentifier<T> identifier, SettableFuture<Optional<T>> readFuture) {
472             super(identifier, ActionableResource.READ, null, null);
473             this.readFuture = readFuture;
474         }
475
476         SettableFuture<Optional<T>> getReadFuture() {
477             return readFuture;
478         }
479     }
480 }