Modification in ResourceBatchingManager.java
[genius.git] / mdsalutil / mdsalutil-api / src / main / java / org / opendaylight / genius / utils / batching / ResourceBatchingManager.java
1 /*
2  * Copyright (c) 2015 - 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.genius.utils.batching;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.util.concurrent.CheckedFuture;
12 import org.apache.commons.lang3.tuple.ImmutablePair;
13 import org.apache.commons.lang3.tuple.Pair;
14 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
15 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
16 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
17 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
18 import org.opendaylight.yangtools.yang.binding.DataObject;
19 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.concurrent.*;
26
27 public class ResourceBatchingManager implements AutoCloseable {
28     private static final Logger LOG = LoggerFactory.getLogger(ResourceBatchingManager.class);
29     private static final int INITIAL_DELAY = 3000;
30     private static final TimeUnit TIME_UNIT = TimeUnit.MILLISECONDS;
31
32     private DataBroker broker;
33     private ConcurrentHashMap<String, Pair<BlockingQueue, ResourceHandler>> resourceHandlerMapper = new ConcurrentHashMap();
34     private ConcurrentHashMap<String, ScheduledThreadPoolExecutor> resourceBatchingThreadMapper = new ConcurrentHashMap();
35
36     private static ResourceBatchingManager instance;
37
38     static {
39         instance = new ResourceBatchingManager();
40     }
41
42     public static ResourceBatchingManager getInstance() {
43         return instance;
44     }
45
46     @Override
47     public void close() throws Exception {
48         LOG.trace("ResourceBatchingManager Closed");
49     }
50
51     public void registerBatchableResource(String resourceType, final BlockingQueue<ActionableResource> resQueue, final ResourceHandler resHandler) {
52         Preconditions.checkNotNull(resQueue, "ResourceQueue to use for batching cannot not be null.");
53         Preconditions.checkNotNull(resHandler, "ResourceHandler cannot not be null.");
54         resourceHandlerMapper.put(resourceType, new ImmutablePair<BlockingQueue, ResourceHandler>(resQueue, resHandler));
55         ScheduledThreadPoolExecutor resDelegatorService =(ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1);
56         resourceBatchingThreadMapper.put(resourceType, resDelegatorService);
57         LOG.info("Registered resourceType {} with batchSize {} and batchInterval {}", resourceType,
58                 resHandler.getBatchSize(), resHandler.getBatchInterval());
59         if (resDelegatorService.getPoolSize() == 0 )
60             resDelegatorService.scheduleWithFixedDelay(new Batcher(resourceType), INITIAL_DELAY, resHandler.getBatchInterval(), TIME_UNIT);
61     }
62
63     public void deregisterBatchableResource(String resourceType) {
64         resourceHandlerMapper.remove(resourceType);
65         resourceBatchingThreadMapper.remove(resourceType);
66     }
67
68     private class Batcher implements Runnable
69     {
70         private String resourceType;
71
72         Batcher(String resourceType) {
73             this.resourceType = resourceType;
74         }
75
76         public void run()
77         {
78             List<ActionableResource> resList = new ArrayList<>();
79
80             try
81             {
82                 Pair<BlockingQueue, ResourceHandler> resMapper = resourceHandlerMapper.get(resourceType);
83                 if (resMapper == null) {
84                     LOG.error("Unable to find resourceMapper for batching the ResourceType {}", resourceType);
85                     return;
86                 }
87                 BlockingQueue<ActionableResource> resQueue = resMapper.getLeft();
88                 ResourceHandler resHandler = resMapper.getRight();
89                 resList.add(resQueue.take());
90                 resQueue.drainTo(resList);
91
92                 long start = System.currentTimeMillis();
93                 int batchSize = resHandler.getBatchSize();
94
95                 int batches = resList.size()/ batchSize;
96                 LOG.trace("Picked up size {} batches {}", resList.size(), batches);
97                 if ( resList.size() > batchSize)
98                 {
99                     for (int i = 0, j = 0; i < batches; j = j + batchSize,i++)
100                     {
101                         new MdsalDsTask<>(resourceType, resList.subList(j, j + batchSize)).process();
102                     }
103                     // process remaining routes
104                     LOG.trace("Picked up 1 size {} ", resList.subList(batches * batchSize, resList.size()).size());
105                     new MdsalDsTask<>(resourceType, resList.subList(batches * batchSize, resList.size())).process();
106                 } else {
107                     // process less than OR == batchsize routes
108                     LOG.trace("Picked up 2 size {}", resList.size());
109                     new MdsalDsTask<>(resourceType, resList).process();
110                 }
111
112                 long timetaken = System.currentTimeMillis() - start;
113                 LOG.info( "Total time taken for resourceList of size: " + resList.size() + " ### time =  " + timetaken);
114
115             } catch (InterruptedException e)
116             {
117                 e.printStackTrace();
118             }
119
120         }
121     }
122
123     private class MdsalDsTask<T extends DataObject>
124     {
125         String resourceType;
126         List<ActionableResource> actResourceList;
127
128         public MdsalDsTask(String resourceType, List<ActionableResource> actResourceList)
129         {
130             this.resourceType = resourceType;
131             this.actResourceList = actResourceList;
132         }
133
134         public void process() {
135             InstanceIdentifier<T> identifier;
136             Object instance;
137             try {
138                 LOG.trace("Picked up 3 size {} of resourceType {}", actResourceList.size(), resourceType);
139                 Pair<BlockingQueue, ResourceHandler> resMapper = resourceHandlerMapper.get(resourceType);
140                 if (resMapper == null) {
141                     LOG.error("Unable to find resourceMapper for batching the ResourceType {}", resourceType);
142                     return;
143                 }
144                 ResourceHandler resHandler = resMapper.getRight();
145                 DataBroker broker = resHandler.getResourceBroker();
146                 LogicalDatastoreType dsType = resHandler.getDatastoreType();
147                 WriteTransaction tx = broker.newWriteOnlyTransaction();
148                 for (ActionableResource actResource : actResourceList)
149                 {
150                     switch (actResource.getAction()) {
151                         case ActionableResource.CREATE:
152                             identifier = actResource.getInstanceIdentifier();
153                             instance = actResource.getInstance();
154                             resHandler.create(tx, dsType, identifier, instance);
155                             break;
156                         case ActionableResource.UPDATE:
157                             identifier = actResource.getInstanceIdentifier();
158                             Object updated = actResource.getInstance();
159                             Object original = actResource.getOldInstance();
160                             resHandler.update(tx, dsType, identifier, original, updated);
161                             break;
162                         case ActionableResource.DELETE:
163                             identifier = actResource.getInstanceIdentifier();
164                             instance = actResource.getInstance();
165                             resHandler.delete(tx, dsType, identifier, instance);
166                             break;
167                         default:
168                             LOG.error("Unable to determine Action for ResourceType {} with ResourceKey {}", resourceType,
169                                     actResource.getKey());
170                     }
171                 }
172
173                 long start = System.currentTimeMillis();
174                 CheckedFuture<Void, TransactionCommitFailedException> futures = tx.submit();
175
176                 try
177                 {
178                     futures.get();
179                     long time = System.currentTimeMillis() - start;
180
181                     LOG.info( " ##### Time taken for " + actResourceList.size() + " = " + time);
182
183                 } catch (InterruptedException | ExecutionException e)
184                 {
185                     LOG.error("Exception occurred while writing to datastore: " + e);
186                 }
187
188             } catch (final Exception e)
189             {
190                 LOG.error("Transaction submission failed: ", e);
191             }
192         }
193     }
194 }