BUG 7494 : Idmanager returns the same Id from the same pool for different
[genius.git] / idmanager / idmanager-impl / src / main / java / org / opendaylight / genius / idmanager / IdManager.java
1 /*
2  * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.genius.idmanager;
10
11 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
12
13 import com.google.common.base.Optional;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import java.util.ArrayList;
16 import java.util.Collections;
17 import java.util.HashMap;
18 import java.util.LinkedList;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.Timer;
22 import java.util.TimerTask;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.Future;
27 import java.util.concurrent.TimeUnit;
28 import java.util.stream.Collectors;
29 import javax.annotation.PostConstruct;
30 import javax.annotation.PreDestroy;
31 import javax.inject.Inject;
32 import javax.inject.Singleton;
33 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
34 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
35 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
36 import org.opendaylight.genius.datastoreutils.DataStoreJobCoordinator;
37 import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
38 import org.opendaylight.genius.idmanager.ReleasedIdHolder.DelayedIdEntry;
39 import org.opendaylight.genius.idmanager.api.IdManagerMonitor;
40 import org.opendaylight.genius.idmanager.jobs.CleanUpJob;
41 import org.opendaylight.genius.idmanager.jobs.IdHolderSyncJob;
42 import org.opendaylight.genius.idmanager.jobs.LocalPoolCreateJob;
43 import org.opendaylight.genius.idmanager.jobs.LocalPoolDeleteJob;
44 import org.opendaylight.genius.idmanager.jobs.UpdateIdEntryJob;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdInput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdOutput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdOutputBuilder;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeInput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeOutput;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeOutputBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolInput;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.DeleteIdPoolInput;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.IdManagerService;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.IdPools;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdInput;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPool;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPoolBuilder;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPoolKey;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.AvailableIdsHolder;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.AvailableIdsHolderBuilder;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ChildPools;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.IdEntries;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ReleasedIdsHolder;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ReleasedIdsHolderBuilder;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.released.ids.DelayedIdEntries;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockManagerService;
67 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
68 import org.opendaylight.yangtools.yang.common.OperationFailedException;
69 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
70 import org.opendaylight.yangtools.yang.common.RpcResult;
71 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
72 import org.slf4j.Logger;
73 import org.slf4j.LoggerFactory;
74
75 @Singleton
76 public class IdManager implements IdManagerService, IdManagerMonitor {
77
78     private static final Logger LOG = LoggerFactory.getLogger(IdManager.class);
79     private static final long DEFAULT_IDLE_TIME = 24 * 60 * 60;
80
81     private final DataBroker broker;
82     private final SingleTransactionDataBroker singleTxDB;
83     private final LockManagerService lockManager;
84     private final IdUtils idUtils;
85
86     private final ConcurrentMap<String, IdLocalPool> localPool;
87     private final Timer cleanJobTimer = new Timer();
88
89     @Inject
90     public IdManager(DataBroker db, LockManagerService lockManager, IdUtils idUtils) throws ReadFailedException {
91         this.broker = db;
92         this.singleTxDB = new SingleTransactionDataBroker(db);
93         this.lockManager = lockManager;
94         this.idUtils = idUtils;
95         this.localPool = new ConcurrentHashMap<>();
96         populateCache();
97     }
98
99     @Override
100     public Map<String, String> getLocalPoolsDetails() {
101         Map<String, String> map = new HashMap<>();
102         localPool.entrySet().stream().forEach(entry -> map.put(entry.getKey(), entry.getValue().toString()));
103         return map;
104     }
105
106     @PostConstruct
107     public void start() {
108         LOG.info("{} start", getClass().getSimpleName());
109     }
110
111     @PreDestroy
112     public void close() throws Exception {
113         LOG.info("{} close", getClass().getSimpleName());
114     }
115
116     private void populateCache() throws ReadFailedException {
117         // If IP changes during reboot, then there will be orphaned child pools.
118         InstanceIdentifier<IdPools> idPoolsInstance = idUtils.getIdPools();
119         Optional<IdPools> idPoolsOptional = singleTxDB.syncReadOptional(CONFIGURATION, idPoolsInstance);
120         if (!idPoolsOptional.isPresent()) {
121             return;
122         }
123         IdPools idPools = idPoolsOptional.get();
124         List<IdPool> idPoolList = idPools.getIdPool();
125         idPoolList
126                 .parallelStream()
127                 .filter(idPool -> idPool.getParentPoolName() != null
128                         && !idPool.getParentPoolName().isEmpty()
129                         && idUtils.getLocalPoolName(idPool.getParentPoolName())
130                                 .equals(idPool.getPoolName()))
131                 .forEach(
132                     idPool -> updateLocalIdPoolCache(idPool,
133                         idPool.getParentPoolName()));
134     }
135
136     public boolean updateLocalIdPoolCache(IdPool idPool, String parentPoolName) {
137         AvailableIdsHolder availableIdsHolder = idPool.getAvailableIdsHolder();
138         AvailableIdHolder availableIdHolder = new AvailableIdHolder(idUtils, availableIdsHolder.getStart(),
139                 availableIdsHolder.getEnd());
140         availableIdHolder.setCur(availableIdsHolder.getCursor());
141         ReleasedIdsHolder releasedIdsHolder = idPool.getReleasedIdsHolder();
142         ReleasedIdHolder releasedIdHolder = new ReleasedIdHolder(idUtils, releasedIdsHolder.getDelayedTimeSec());
143         releasedIdHolder.setAvailableIdCount(releasedIdsHolder.getAvailableIdCount());
144         List<DelayedIdEntries> delayedEntries = releasedIdsHolder.getDelayedIdEntries();
145         List<DelayedIdEntry> delayedIdEntryInCache = new ArrayList<>();
146         if (delayedEntries != null) {
147             delayedIdEntryInCache = delayedEntries
148                     .parallelStream()
149                     .map(delayedIdEntry -> new DelayedIdEntry(delayedIdEntry
150                             .getId(), delayedIdEntry.getReadyTimeSec()))
151                             .sorted((idEntry1, idEntry2) -> Long.compare(idEntry1.getReadyTimeSec(),
152                                     idEntry2.getReadyTimeSec())).collect(Collectors.toList());
153         }
154         releasedIdHolder.setDelayedEntries(delayedIdEntryInCache);
155
156         IdLocalPool idLocalPool = new IdLocalPool(idUtils, idPool.getPoolName());
157         idLocalPool.setAvailableIds(availableIdHolder);
158         idLocalPool.setReleasedIds(releasedIdHolder);
159         localPool.put(parentPoolName, idLocalPool);
160         if (LOG.isDebugEnabled()) {
161             LOG.debug("Populating cache for {} with {}", idLocalPool.getPoolName(), idLocalPool);
162         }
163         return true;
164     }
165
166     @Override
167     public Future<RpcResult<Void>> createIdPool(CreateIdPoolInput input) {
168         if (LOG.isDebugEnabled()) {
169             LOG.debug("createIdPool called with input {}", input);
170         }
171         String poolName = input.getPoolName();
172         long low = input.getLow();
173         long high = input.getHigh();
174         long blockSize = idUtils.computeBlockSize(low, high);
175         Future<RpcResult<Void>> futureResult;
176         try {
177             idUtils.lockPool(lockManager, poolName);
178             WriteTransaction tx = broker.newWriteOnlyTransaction();
179             poolName = poolName.intern();
180             IdPool idPool;
181             idPool = createGlobalPool(tx, poolName, low, high, blockSize);
182             String localPoolName = idUtils.getLocalPoolName(poolName);
183             IdLocalPool idLocalPool = localPool.get(poolName);
184             if (idLocalPool == null) {
185                 createLocalPool(tx, localPoolName, idPool);
186                 idUtils.updateChildPool(tx, idPool.getPoolName(), localPoolName);
187             }
188             tx.submit().checkedGet();
189             futureResult = RpcResultBuilder.<Void>success().buildFuture();
190         } catch (OperationFailedException | IdManagerException e) {
191             futureResult = buildFailedRpcResultFuture("createIdPool failed: " + input.toString(), e);
192         } finally {
193             try {
194                 idUtils.unlockPool(lockManager, poolName);
195             } catch (IdManagerException e) {
196                 futureResult = buildFailedRpcResultFuture("createIdPool unlockPool() failed: " + input.toString(), e);
197             }
198         }
199         return futureResult;
200     }
201
202     @Override
203     public Future<RpcResult<AllocateIdOutput>> allocateId(AllocateIdInput input) {
204         if (LOG.isDebugEnabled()) {
205             LOG.debug("AllocateId called with input {}", input);
206         }
207         String idKey = input.getIdKey();
208         String poolName = input.getPoolName();
209         String localPoolName = idUtils.getLocalPoolName(poolName);
210         long newIdValue = -1;
211         AllocateIdOutputBuilder output = new AllocateIdOutputBuilder();
212         Future<RpcResult<AllocateIdOutput>> futureResult;
213         try {
214             //allocateIdFromLocalPool method returns a list of IDs with one element. This element is obtained by get(0)
215             newIdValue = allocateIdFromLocalPool(poolName, localPoolName, idKey, 1).get(0);
216             output.setIdValue(newIdValue);
217             futureResult = RpcResultBuilder.<AllocateIdOutput>success().withResult(output.build()).buildFuture();
218         } catch (OperationFailedException | IdManagerException e) {
219             futureResult = buildFailedRpcResultFuture("allocateId failed: " + input.toString(), e);
220         }
221         return futureResult;
222     }
223
224     @Override
225     public Future<RpcResult<AllocateIdRangeOutput>> allocateIdRange(AllocateIdRangeInput input) {
226         if (LOG.isDebugEnabled()) {
227             LOG.debug("AllocateIdRange called with input {}", input);
228         }
229         String idKey = input.getIdKey();
230         String poolName = input.getPoolName();
231         long size = input.getSize();
232         String localPoolName = idUtils.getLocalPoolName(poolName);
233         List<Long> newIdValuesList = new ArrayList<>();
234         AllocateIdRangeOutputBuilder output = new AllocateIdRangeOutputBuilder();
235         Future<RpcResult<AllocateIdRangeOutput>> futureResult;
236         try {
237             newIdValuesList = allocateIdFromLocalPool(poolName, localPoolName, idKey, size);
238             Collections.sort(newIdValuesList);
239             output.setIdValues(newIdValuesList);
240             futureResult = RpcResultBuilder.<AllocateIdRangeOutput>success().withResult(output.build()).buildFuture();
241         } catch (OperationFailedException | IdManagerException e) {
242             futureResult = buildFailedRpcResultFuture("allocateIdRange failed: " + input.toString(), e);
243         }
244         return futureResult;
245     }
246
247     @Override
248     public Future<RpcResult<Void>> deleteIdPool(DeleteIdPoolInput input) {
249         if (LOG.isDebugEnabled()) {
250             LOG.debug("DeleteIdPool called with input {}", input);
251         }
252         String poolName = input.getPoolName();
253         Future<RpcResult<Void>> futureResult;
254         try {
255             InstanceIdentifier<IdPool> idPoolToBeDeleted = idUtils.getIdPoolInstance(poolName);
256             poolName = poolName.intern();
257             synchronized (poolName) {
258                 IdPool idPool = singleTxDB.syncRead(CONFIGURATION, idPoolToBeDeleted);
259                 List<ChildPools> childPoolList = idPool.getChildPools();
260                 if (childPoolList != null) {
261                     childPoolList.parallelStream().forEach(childPool -> deletePool(childPool.getChildPoolName()));
262                 }
263                 singleTxDB.syncDelete(CONFIGURATION, idPoolToBeDeleted);
264                 if (LOG.isDebugEnabled()) {
265                     LOG.debug("Deleted id pool {}", poolName);
266                 }
267             }
268             futureResult = RpcResultBuilder.<Void>success().buildFuture();
269         } catch (OperationFailedException e) {
270             futureResult = buildFailedRpcResultFuture("deleteIdPool failed: " + input.toString(), e);
271         }
272         return futureResult;
273     }
274
275     @Override
276     public Future<RpcResult<Void>> releaseId(ReleaseIdInput input) {
277         String poolName = input.getPoolName();
278         String idKey = input.getIdKey();
279         Future<RpcResult<Void>> futureResult;
280         try {
281             releaseIdFromLocalPool(poolName, idUtils.getLocalPoolName(poolName), idKey);
282             futureResult = RpcResultBuilder.<Void>success().buildFuture();
283         } catch (ReadFailedException | IdManagerException e) {
284             futureResult = buildFailedRpcResultFuture("releaseId failed: " + input.toString(), e);
285         }
286         return futureResult;
287     }
288
289     private <T> ListenableFuture<RpcResult<T>> buildFailedRpcResultFuture(String msg, Exception exception) {
290         LOG.error(msg, exception);
291         RpcResultBuilder<T> failedRpcResultBuilder = RpcResultBuilder.failed();
292         failedRpcResultBuilder.withError(ErrorType.APPLICATION, msg, exception);
293         if (exception instanceof OperationFailedException) {
294             failedRpcResultBuilder.withRpcErrors(((OperationFailedException) exception).getErrorList());
295         }
296         return failedRpcResultBuilder.buildFuture();
297     }
298
299     private List<Long> allocateIdFromLocalPool(String parentPoolName, String localPoolName, String idKey, long size)
300             throws OperationFailedException, IdManagerException {
301         if (LOG.isDebugEnabled()) {
302             LOG.debug("Allocating id from local pool {}. Parent pool {}. Idkey {}", localPoolName, parentPoolName,
303                     idKey);
304         }
305         long newIdValue = -1;
306         List<Long> newIdValuesList = new ArrayList<>();
307         localPoolName = localPoolName.intern();
308         InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
309         InstanceIdentifier<IdEntries> existingId = idUtils.getIdEntry(parentIdPoolInstanceIdentifier, idKey);
310         Optional<IdEntries> existingIdEntry = singleTxDB.syncReadOptional(CONFIGURATION, existingId);
311         if (existingIdEntry.isPresent()) {
312             newIdValuesList = existingIdEntry.get().getIdValue();
313             if (LOG.isDebugEnabled()) {
314                 LOG.debug("Existing ids {} for the key {} ", newIdValuesList, idKey);
315             }
316             return newIdValuesList;
317         }
318         //This get will not help in concurrent reads. Hence the same read needs to be done again.
319         IdLocalPool localIdPool = localPool.get(parentPoolName);
320         if (localIdPool == null) {
321             idUtils.lockPool(lockManager, parentPoolName);
322             try {
323                 //Check if a previous thread that got the cluster-wide lock first, has created the localPool
324                 if (localPool.get(parentPoolName) == null) {
325                     WriteTransaction tx = broker.newWriteOnlyTransaction();
326                     IdPool parentIdPool = singleTxDB.syncRead(CONFIGURATION, parentIdPoolInstanceIdentifier);
327                     localIdPool = createLocalPool(tx, localPoolName, parentIdPool); // Return localIdPool.....
328                     tx.submit().checkedGet();
329                 } else {
330                     localIdPool = localPool.get(parentPoolName);
331                 }
332             } finally {
333                 idUtils.unlockPool(lockManager, parentPoolName);
334             }
335         }
336         if (LOG.isDebugEnabled()) {
337             LOG.debug("Got pool {}", localIdPool);
338         }
339         if (size == 1) {
340             newIdValue = getIdFromLocalPoolCache(localIdPool, parentPoolName);
341             newIdValuesList.add(newIdValue);
342         } else {
343             IdPool parentIdPool = singleTxDB.syncRead(CONFIGURATION, parentIdPoolInstanceIdentifier);
344             long totalAvailableIdCount = localIdPool.getAvailableIds().getAvailableIdCount()
345                     + localIdPool.getReleasedIds().getAvailableIdCount();
346             AvailableIdsHolderBuilder availableParentIds = idUtils.getAvailableIdsHolderBuilder(parentIdPool);
347             ReleasedIdsHolderBuilder releasedParentIds = idUtils.getReleaseIdsHolderBuilder(parentIdPool);
348             totalAvailableIdCount = totalAvailableIdCount + releasedParentIds.getAvailableIdCount()
349                     + idUtils.getAvailableIdsCount(availableParentIds);
350             if (totalAvailableIdCount > size) {
351                 while (size > 0) {
352                     try {
353                         newIdValue = getIdFromLocalPoolCache(localIdPool, parentPoolName);
354                     } catch (OperationFailedException e) {
355                         if (LOG.isDebugEnabled()) {
356                             LOG.debug("Releasing IDs to pool {}", localPoolName);
357                         }
358                         // Releasing the IDs added in newIdValuesList since a null list would be returned now, as the
359                         // requested size of list IDs exceeds the number of available IDs.
360                         updateDelayedEntriesInLocalCache(newIdValuesList, parentPoolName, localIdPool);
361                     }
362                     newIdValuesList.add(newIdValue);
363                     size--;
364                 }
365             } else {
366                 throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentPoolName));
367             }
368         }
369         if (LOG.isDebugEnabled()) {
370             LOG.debug("The newIdValues {} for the idKey {}", newIdValuesList, idKey);
371         }
372         idUtils.releaseIdLatchMap.put(parentPoolName + idKey, new CountDownLatch(1));
373         UpdateIdEntryJob job = new UpdateIdEntryJob(parentPoolName, localPoolName, idKey, newIdValuesList, broker,
374                 idUtils);
375         DataStoreJobCoordinator.getInstance().enqueueJob(parentPoolName, job, IdUtils.RETRY_COUNT);
376         return newIdValuesList;
377     }
378
379     private Long getIdFromLocalPoolCache(IdLocalPool localIdPool, String parentPoolName)
380             throws OperationFailedException, IdManagerException {
381         while (true) {
382             IdHolder releasedIds = localIdPool.getReleasedIds();
383             Optional<Long> releasedId = releasedIds.allocateId();
384             if (releasedId.isPresent()) {
385                 IdHolderSyncJob poolSyncJob = new IdHolderSyncJob(localIdPool.getPoolName(), releasedIds, broker,
386                         idUtils);
387                 DataStoreJobCoordinator.getInstance().enqueueJob(localIdPool.getPoolName(), poolSyncJob,
388                         IdUtils.RETRY_COUNT);
389                 return releasedId.get();
390             }
391             IdHolder availableIds = localIdPool.getAvailableIds();
392             if (availableIds != null) {
393                 Optional<Long> availableId = availableIds.allocateId();
394                 if (availableId.isPresent()) {
395                     IdHolderSyncJob poolSyncJob = new IdHolderSyncJob(localIdPool.getPoolName(), availableIds, broker,
396                             idUtils);
397                     DataStoreJobCoordinator.getInstance().enqueueJob(localIdPool.getPoolName(), poolSyncJob,
398                             IdUtils.RETRY_COUNT);
399                     return availableId.get();
400                 }
401             }
402             long idCount = getIdBlockFromParentPool(parentPoolName, localIdPool);
403             if (idCount <= 0) {
404                 if (LOG.isDebugEnabled()) {
405                     LOG.debug("Unable to allocate Id block from global pool");
406                 }
407                 throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentPoolName));
408             }
409         }
410     }
411
412     /**
413      * Changes made to availableIds and releasedIds will not be persisted to the datastore.
414      */
415     private long getIdBlockFromParentPool(String parentPoolName, IdLocalPool localIdPool)
416             throws OperationFailedException, IdManagerException {
417         if (LOG.isDebugEnabled()) {
418             LOG.debug("Allocating block of id from parent pool {}", parentPoolName);
419         }
420         InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
421         parentPoolName = parentPoolName.intern();
422         idUtils.lockPool(lockManager, parentPoolName);
423         long idCount = 0;
424         try {
425             // Check if the childpool already got id block.
426             long availableIdCount =
427                     localIdPool.getAvailableIds().getAvailableIdCount()
428                             + localIdPool.getReleasedIds().getAvailableIdCount();
429             if (availableIdCount > 0) {
430                 return availableIdCount;
431             }
432             WriteTransaction tx = broker.newWriteOnlyTransaction();
433             IdPool parentIdPool = singleTxDB.syncRead(CONFIGURATION, idPoolInstanceIdentifier);
434             idCount = allocateIdBlockFromParentPool(localIdPool, parentIdPool, tx);
435             tx.submit().checkedGet();
436         } catch (IdManagerException | NullPointerException e) {
437             LOG.error("Error getting id block from parent pool. {}", e.getMessage());
438         } finally {
439             idUtils.unlockPool(lockManager, parentPoolName);
440         }
441         return idCount;
442     }
443
444     private long allocateIdBlockFromParentPool(IdLocalPool localPoolCache, IdPool parentIdPool, WriteTransaction tx)
445             throws OperationFailedException, IdManagerException {
446         long idCount = -1;
447         ReleasedIdsHolderBuilder releasedIdsBuilderParent = idUtils.getReleaseIdsHolderBuilder(parentIdPool);
448         while (true) {
449             idCount = allocateIdBlockFromReleasedIdsHolder(localPoolCache, releasedIdsBuilderParent, parentIdPool, tx);
450             if (idCount > 0) {
451                 return idCount;
452             }
453             idCount = allocateIdBlockFromAvailableIdsHolder(localPoolCache, parentIdPool, tx);
454             if (idCount > 0) {
455                 return idCount;
456             }
457             idCount = getIdsFromOtherChildPools(releasedIdsBuilderParent, parentIdPool);
458             if (idCount <= 0) {
459                 if (LOG.isDebugEnabled()) {
460                     LOG.debug("Unable to allocate Id block from global pool");
461                 }
462                 throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentIdPool.getPoolName()));
463             }
464         }
465     }
466
467     private long getIdsFromOtherChildPools(ReleasedIdsHolderBuilder releasedIdsBuilderParent, IdPool parentIdPool)
468             throws OperationFailedException {
469         List<ChildPools> childPoolsList = parentIdPool.getChildPools();
470         // Sorting the child pools on last accessed time so that the pool that
471         // was not accessed for a long time comes first.
472         Collections.sort(childPoolsList,
473             (childPool1, childPool2) -> childPool1.getLastAccessTime().compareTo(childPool2.getLastAccessTime()));
474         long currentTime = System.currentTimeMillis() / 1000;
475         for (ChildPools childPools : childPoolsList) {
476             if (childPools.getLastAccessTime() + DEFAULT_IDLE_TIME > currentTime) {
477                 break;
478             }
479             if (!childPools.getChildPoolName().equals(idUtils.getLocalPoolName(parentIdPool.getPoolName()))) {
480                 InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils
481                         .getIdPoolInstance(childPools.getChildPoolName());
482                 IdPool otherChildPool = singleTxDB.syncRead(CONFIGURATION, idPoolInstanceIdentifier);
483                 ReleasedIdsHolderBuilder releasedIds = idUtils.getReleaseIdsHolderBuilder(otherChildPool);
484
485                 List<DelayedIdEntries> delayedIdEntriesChild = releasedIds.getDelayedIdEntries();
486                 List<DelayedIdEntries> delayedIdEntriesParent = releasedIdsBuilderParent.getDelayedIdEntries();
487                 if (delayedIdEntriesParent == null) {
488                     delayedIdEntriesParent = new LinkedList<>();
489                 }
490                 delayedIdEntriesParent.addAll(delayedIdEntriesChild);
491                 delayedIdEntriesChild.removeAll(delayedIdEntriesChild);
492
493                 AvailableIdsHolderBuilder availableIds = idUtils.getAvailableIdsHolderBuilder(otherChildPool);
494                 while (idUtils.isIdAvailable(availableIds)) {
495                     long cursor = availableIds.getCursor() + 1;
496                     delayedIdEntriesParent.add(idUtils.createDelayedIdEntry(cursor, currentTime));
497                     availableIds.setCursor(cursor);
498                 }
499
500                 long totalAvailableIdCount = releasedIds.getDelayedIdEntries().size()
501                         + idUtils.getAvailableIdsCount(availableIds);
502                 long count = releasedIdsBuilderParent.getAvailableIdCount() + totalAvailableIdCount;
503                 releasedIdsBuilderParent.setDelayedIdEntries(delayedIdEntriesParent).setAvailableIdCount(count);
504                 singleTxDB.syncUpdate(CONFIGURATION, idPoolInstanceIdentifier,
505                         new IdPoolBuilder().setKey(new IdPoolKey(otherChildPool.getPoolName()))
506                                 .setAvailableIdsHolder(availableIds.build()).setReleasedIdsHolder(releasedIds.build())
507                                 .build());
508                 return totalAvailableIdCount;
509             }
510         }
511         return 0;
512     }
513
514     private long allocateIdBlockFromReleasedIdsHolder(IdLocalPool localIdPool,
515             ReleasedIdsHolderBuilder releasedIdsBuilderParent, IdPool parentIdPool, WriteTransaction tx) {
516         if (releasedIdsBuilderParent.getAvailableIdCount() == 0) {
517             if (LOG.isDebugEnabled()) {
518                 LOG.debug("Ids unavailable in releasedIds of parent pool {}", parentIdPool);
519             }
520             return 0;
521         }
522         List<DelayedIdEntries> delayedIdEntriesParent = releasedIdsBuilderParent.getDelayedIdEntries();
523         int idCount = Math.min(delayedIdEntriesParent.size(), parentIdPool.getBlockSize());
524         List<DelayedIdEntries> idEntriesToBeRemoved = delayedIdEntriesParent.subList(0, idCount);
525         ReleasedIdHolder releasedIds = (ReleasedIdHolder) localIdPool.getReleasedIds();
526         List<DelayedIdEntry> delayedIdEntriesLocalCache = releasedIds.getDelayedEntries();
527         delayedIdEntriesLocalCache = idEntriesToBeRemoved
528                 .parallelStream()
529                 .map(delayedIdEntry -> new DelayedIdEntry(delayedIdEntry
530                         .getId(), delayedIdEntry.getReadyTimeSec()))
531                 .sorted((idEntry1, idEntry2) -> Long.compare(idEntry1.getReadyTimeSec(),
532                         idEntry2.getReadyTimeSec())).collect(Collectors.toList());
533         releasedIds.setDelayedEntries(delayedIdEntriesLocalCache);
534         releasedIds.setAvailableIdCount(releasedIds.getAvailableIdCount() + idCount);
535         localIdPool.setReleasedIds(releasedIds);
536         delayedIdEntriesParent.removeAll(idEntriesToBeRemoved);
537         releasedIdsBuilderParent.setDelayedIdEntries(delayedIdEntriesParent);
538         InstanceIdentifier<ReleasedIdsHolder> releasedIdsHolderInstanceIdentifier = InstanceIdentifier
539                 .builder(IdPools.class).child(IdPool.class,
540                         new IdPoolKey(parentIdPool.getPoolName())).child(ReleasedIdsHolder.class).build();
541         releasedIdsBuilderParent.setAvailableIdCount(releasedIdsBuilderParent.getAvailableIdCount() - idCount);
542         if (LOG.isDebugEnabled()) {
543             LOG.debug("Allocated {} ids from releasedIds of parent pool {}", idCount, parentIdPool);
544         }
545         tx.merge(CONFIGURATION, releasedIdsHolderInstanceIdentifier,
546                 releasedIdsBuilderParent.build(), true);
547         return idCount;
548     }
549
550     private long allocateIdBlockFromAvailableIdsHolder(IdLocalPool localIdPool, IdPool parentIdPool,
551             WriteTransaction tx) {
552         long idCount = 0;
553         AvailableIdsHolderBuilder availableIdsBuilderParent = idUtils.getAvailableIdsHolderBuilder(parentIdPool);
554         long end = availableIdsBuilderParent.getEnd();
555         long cur = availableIdsBuilderParent.getCursor();
556         if (!idUtils.isIdAvailable(availableIdsBuilderParent)) {
557             if (LOG.isDebugEnabled()) {
558                 LOG.debug("Ids exhausted in parent pool {}", parentIdPool);
559             }
560             return idCount;
561         }
562         // Update availableIdsHolder of Local Pool
563         idCount = Math.min(end - cur, parentIdPool.getBlockSize());
564         AvailableIdHolder availableIds = new AvailableIdHolder(idUtils, cur + 1, cur + idCount);
565         localIdPool.setAvailableIds(availableIds);
566         // Update availableIdsHolder of Global Pool
567         InstanceIdentifier<AvailableIdsHolder> availableIdsHolderInstanceIdentifier = InstanceIdentifier
568                 .builder(IdPools.class).child(IdPool.class,
569                         new IdPoolKey(parentIdPool.getPoolName())).child(AvailableIdsHolder.class).build();
570         availableIdsBuilderParent.setCursor(cur + idCount);
571         if (LOG.isDebugEnabled()) {
572             LOG.debug("Allocated {} ids from availableIds of global pool {}", idCount, parentIdPool);
573         }
574         tx.merge(CONFIGURATION, availableIdsHolderInstanceIdentifier,
575                 availableIdsBuilderParent.build(), true);
576         return idCount;
577     }
578
579     private void releaseIdFromLocalPool(String parentPoolName, String localPoolName, String idKey)
580             throws ReadFailedException, IdManagerException {
581         String idLatchKey = parentPoolName + idKey;
582         java.util.Optional.ofNullable(idUtils.releaseIdLatchMap.get(idLatchKey)).ifPresent(latch -> {
583             try {
584                 latch.await(5, TimeUnit.SECONDS);
585             } catch (InterruptedException ignored) {
586                 LOG.warn("Thread interrupted while releasing id {} from id pool {}", idKey, parentPoolName);
587             } finally {
588                 idUtils.releaseIdLatchMap.remove(idLatchKey);
589             }
590         } );
591         localPoolName = localPoolName.intern();
592         InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
593         IdPool parentIdPool = singleTxDB.syncRead(CONFIGURATION, parentIdPoolInstanceIdentifier);
594         List<IdEntries> idEntries = parentIdPool.getIdEntries();
595         List<IdEntries> newIdEntries = idEntries;
596         if (idEntries == null) {
597             throw new IdManagerException("Id Entries does not exist");
598         }
599         InstanceIdentifier<IdEntries> existingId = idUtils.getIdEntry(parentIdPoolInstanceIdentifier, idKey);
600         Optional<IdEntries> existingIdEntryObject = singleTxDB.syncReadOptional(CONFIGURATION, existingId);
601         if (!existingIdEntryObject.isPresent()) {
602             throw new IdManagerException(
603                     String.format("Specified Id key %s does not exist in id pool %s", idKey, parentPoolName));
604         }
605         IdEntries existingIdEntry = existingIdEntryObject.get();
606         List<Long> idValuesList = existingIdEntry.getIdValue();
607         IdLocalPool localIdPoolCache = localPool.get(parentPoolName);
608         boolean isRemoved = newIdEntries.remove(existingIdEntry);
609         if (LOG.isDebugEnabled()) {
610             LOG.debug("The entry {} is removed {}", existingIdEntry, isRemoved);
611         }
612         updateDelayedEntriesInLocalCache(idValuesList, parentPoolName, localIdPoolCache);
613         IdHolderSyncJob poolSyncJob = new IdHolderSyncJob(localPoolName, localIdPoolCache.getReleasedIds(), broker,
614                 idUtils);
615         DataStoreJobCoordinator.getInstance().enqueueJob(localPoolName, poolSyncJob, IdUtils.RETRY_COUNT);
616         scheduleCleanUpTask(localIdPoolCache, parentPoolName, parentIdPool.getBlockSize());
617         if (LOG.isDebugEnabled()) {
618             LOG.debug("Released id ({}, {}) from pool {}", idKey, idValuesList, localPoolName);
619         }
620         // Updating id entries in the parent pool. This will be used for restart scenario
621         UpdateIdEntryJob job = new UpdateIdEntryJob(parentPoolName, localPoolName, idKey, null, broker, idUtils);
622         DataStoreJobCoordinator.getInstance().enqueueJob(parentPoolName, job, IdUtils.RETRY_COUNT);
623     }
624
625     private void scheduleCleanUpTask(final IdLocalPool localIdPoolCache,
626             final String parentPoolName, final int blockSize) {
627         TimerTask scheduledTask = new TimerTask() {
628             @Override
629             public void run() {
630                 CleanUpJob job = new CleanUpJob(localIdPoolCache, broker, parentPoolName, blockSize, lockManager,
631                         idUtils);
632                 DataStoreJobCoordinator.getInstance().enqueueJob(localIdPoolCache.getPoolName(), job,
633                         IdUtils.RETRY_COUNT);
634             }
635         };
636         cleanJobTimer.schedule(scheduledTask, IdUtils.DEFAULT_DELAY_TIME * 1000);
637     }
638
639     private IdPool createGlobalPool(WriteTransaction tx, String poolName, long low, long high, long blockSize)
640             throws ReadFailedException {
641         IdPool idPool;
642         InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils.getIdPoolInstance(poolName);
643         Optional<IdPool> existingIdPool = singleTxDB.syncReadOptional(CONFIGURATION, idPoolInstanceIdentifier);
644         if (!existingIdPool.isPresent()) {
645             if (LOG.isDebugEnabled()) {
646                 LOG.debug("Creating new global pool {}", poolName);
647             }
648             idPool = idUtils.createGlobalPool(poolName, low, high, blockSize);
649             tx.put(CONFIGURATION, idPoolInstanceIdentifier, idPool, true);
650         } else {
651             idPool = existingIdPool.get();
652             if (LOG.isDebugEnabled()) {
653                 LOG.debug("GlobalPool exists {}", idPool);
654             }
655         }
656         return idPool;
657     }
658
659     private IdLocalPool createLocalPool(WriteTransaction tx, String localPoolName, IdPool idPool)
660             throws OperationFailedException, IdManagerException {
661         localPoolName = localPoolName.intern();
662         IdLocalPool idLocalPool = new IdLocalPool(idUtils, localPoolName);
663         allocateIdBlockFromParentPool(idLocalPool, idPool, tx);
664         String parentPool = idPool.getPoolName();
665         localPool.put(parentPool, idLocalPool);
666         LocalPoolCreateJob job = new LocalPoolCreateJob(idLocalPool, broker, idPool.getPoolName(),
667                 idPool.getBlockSize(), idUtils);
668         DataStoreJobCoordinator.getInstance().enqueueJob(localPoolName, job, IdUtils.RETRY_COUNT);
669         return idLocalPool;
670     }
671
672     private void deletePool(String poolName) {
673         LocalPoolDeleteJob job = new LocalPoolDeleteJob(poolName, broker, idUtils);
674         DataStoreJobCoordinator.getInstance().enqueueJob(poolName, job, IdUtils.RETRY_COUNT);
675     }
676
677     public void poolDeleted(String parentPoolName, String poolName) {
678         IdLocalPool idLocalPool = localPool.get(parentPoolName);
679         if (idLocalPool != null) {
680             if (idLocalPool.getPoolName().equals(poolName)) {
681                 localPool.remove(parentPoolName);
682             }
683         }
684     }
685
686     private void updateDelayedEntriesInLocalCache(List<Long> idsList, String parentPoolName,
687             IdLocalPool localPoolCache) {
688         for (long idValue : idsList) {
689             localPoolCache.getReleasedIds().addId(idValue);
690         }
691         localPool.put(parentPoolName, localPoolCache);
692     }
693
694 }