Merge "Change externalId tep param dpn-br-name to br-name"
[genius.git] / idmanager / idmanager-impl / src / main / java / org / opendaylight / genius / idmanager / IdManager.java
1 /*
2  * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.genius.idmanager;
10
11 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
12
13 import com.google.common.base.Optional;
14 import com.google.common.util.concurrent.ListenableFuture;
15
16 import java.util.ArrayList;
17 import java.util.Collections;
18 import java.util.HashMap;
19 import java.util.LinkedList;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.Timer;
23 import java.util.TimerTask;
24 import java.util.concurrent.CompletableFuture;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.TimeUnit;
32 import java.util.stream.Collectors;
33
34 import javax.annotation.PostConstruct;
35 import javax.annotation.PreDestroy;
36 import javax.inject.Inject;
37 import javax.inject.Singleton;
38
39 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
40 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
41 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
42 import org.opendaylight.genius.datastoreutils.DataStoreJobCoordinator;
43 import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
44 import org.opendaylight.genius.idmanager.ReleasedIdHolder.DelayedIdEntry;
45 import org.opendaylight.genius.idmanager.api.IdManagerMonitor;
46 import org.opendaylight.genius.idmanager.jobs.CleanUpJob;
47 import org.opendaylight.genius.idmanager.jobs.IdHolderSyncJob;
48 import org.opendaylight.genius.idmanager.jobs.LocalPoolCreateJob;
49 import org.opendaylight.genius.idmanager.jobs.LocalPoolDeleteJob;
50 import org.opendaylight.genius.idmanager.jobs.UpdateIdEntryJob;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdInput;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdOutput;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdOutputBuilder;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeInput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeOutput;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeOutputBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolInput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.DeleteIdPoolInput;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.IdManagerService;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.IdPools;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdInput;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPool;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPoolBuilder;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPoolKey;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.AvailableIdsHolder;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.AvailableIdsHolderBuilder;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ChildPools;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.IdEntries;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ReleasedIdsHolder;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ReleasedIdsHolderBuilder;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.released.ids.DelayedIdEntries;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockManagerService;
73 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
74 import org.opendaylight.yangtools.yang.common.OperationFailedException;
75 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
76 import org.opendaylight.yangtools.yang.common.RpcResult;
77 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
78 import org.slf4j.Logger;
79 import org.slf4j.LoggerFactory;
80
81 @Singleton
82 public class IdManager implements IdManagerService, IdManagerMonitor {
83
84     private static final Logger LOG = LoggerFactory.getLogger(IdManager.class);
85     private static final long DEFAULT_IDLE_TIME = 24 * 60 * 60;
86
87     private final DataBroker broker;
88     private final SingleTransactionDataBroker singleTxDB;
89     private final LockManagerService lockManager;
90     private final IdUtils idUtils;
91
92     private final ConcurrentMap<String, IdLocalPool> localPool;
93     private final Timer cleanJobTimer = new Timer();
94
95     @Inject
96     public IdManager(DataBroker db, LockManagerService lockManager, IdUtils idUtils) throws ReadFailedException {
97         this.broker = db;
98         this.singleTxDB = new SingleTransactionDataBroker(db);
99         this.lockManager = lockManager;
100         this.idUtils = idUtils;
101         this.localPool = new ConcurrentHashMap<>();
102         populateCache();
103     }
104
105     @Override
106     public Map<String, String> getLocalPoolsDetails() {
107         Map<String, String> map = new HashMap<>();
108         localPool.entrySet().stream().forEach(entry -> map.put(entry.getKey(), entry.getValue().toString()));
109         return map;
110     }
111
112     @PostConstruct
113     public void start() {
114         LOG.info("{} start", getClass().getSimpleName());
115     }
116
117     @PreDestroy
118     public void close() {
119         LOG.info("{} close", getClass().getSimpleName());
120     }
121
122     private void populateCache() throws ReadFailedException {
123         // If IP changes during reboot, then there will be orphaned child pools.
124         InstanceIdentifier<IdPools> idPoolsInstance = idUtils.getIdPools();
125         Optional<IdPools> idPoolsOptional = singleTxDB.syncReadOptional(CONFIGURATION, idPoolsInstance);
126         if (!idPoolsOptional.isPresent()) {
127             return;
128         }
129         IdPools idPools = idPoolsOptional.get();
130         List<IdPool> idPoolList = idPools.getIdPool();
131         idPoolList
132                 .parallelStream()
133                 .filter(idPool -> idPool.getParentPoolName() != null
134                         && !idPool.getParentPoolName().isEmpty()
135                         && idUtils.getLocalPoolName(idPool.getParentPoolName())
136                                 .equals(idPool.getPoolName()))
137                 .forEach(
138                     idPool -> updateLocalIdPoolCache(idPool,
139                         idPool.getParentPoolName()));
140     }
141
142     public boolean updateLocalIdPoolCache(IdPool idPool, String parentPoolName) {
143         AvailableIdsHolder availableIdsHolder = idPool.getAvailableIdsHolder();
144         AvailableIdHolder availableIdHolder = new AvailableIdHolder(idUtils, availableIdsHolder.getStart(),
145                 availableIdsHolder.getEnd());
146         availableIdHolder.setCur(availableIdsHolder.getCursor());
147         ReleasedIdsHolder releasedIdsHolder = idPool.getReleasedIdsHolder();
148         ReleasedIdHolder releasedIdHolder = new ReleasedIdHolder(idUtils, releasedIdsHolder.getDelayedTimeSec());
149         releasedIdHolder.setAvailableIdCount(releasedIdsHolder.getAvailableIdCount());
150         List<DelayedIdEntries> delayedEntries = releasedIdsHolder.getDelayedIdEntries();
151         List<DelayedIdEntry> delayedIdEntryInCache = new CopyOnWriteArrayList<>();
152         if (delayedEntries != null) {
153             delayedIdEntryInCache = delayedEntries
154                     .parallelStream()
155                     .map(delayedIdEntry -> new DelayedIdEntry(delayedIdEntry
156                             .getId(), delayedIdEntry.getReadyTimeSec()))
157                             .sorted((idEntry1, idEntry2) -> Long.compare(idEntry1.getReadyTimeSec(),
158                                     idEntry2.getReadyTimeSec())).collect(Collectors.toList());
159         }
160         releasedIdHolder.setDelayedEntries(delayedIdEntryInCache);
161
162         IdLocalPool idLocalPool = new IdLocalPool(idUtils, idPool.getPoolName());
163         idLocalPool.setAvailableIds(availableIdHolder);
164         idLocalPool.setReleasedIds(releasedIdHolder);
165         localPool.put(parentPoolName, idLocalPool);
166         if (LOG.isDebugEnabled()) {
167             LOG.debug("Populating cache for {} with {}", idLocalPool.getPoolName(), idLocalPool);
168         }
169         return true;
170     }
171
172     @Override
173     public Future<RpcResult<Void>> createIdPool(CreateIdPoolInput input) {
174         if (LOG.isDebugEnabled()) {
175             LOG.debug("createIdPool called with input {}", input);
176         }
177         String poolName = input.getPoolName();
178         long low = input.getLow();
179         long high = input.getHigh();
180         long blockSize = idUtils.computeBlockSize(low, high);
181         Future<RpcResult<Void>> futureResult;
182         try {
183             idUtils.lock(lockManager, poolName);
184             WriteTransaction tx = broker.newWriteOnlyTransaction();
185             poolName = poolName.intern();
186             IdPool idPool;
187             idPool = createGlobalPool(tx, poolName, low, high, blockSize);
188             String localPoolName = idUtils.getLocalPoolName(poolName);
189             IdLocalPool idLocalPool = localPool.get(poolName);
190             if (idLocalPool == null) {
191                 createLocalPool(tx, localPoolName, idPool);
192                 idUtils.updateChildPool(tx, idPool.getPoolName(), localPoolName);
193             }
194             tx.submit().checkedGet();
195             futureResult = RpcResultBuilder.<Void>success().buildFuture();
196         } catch (OperationFailedException | IdManagerException e) {
197             futureResult = buildFailedRpcResultFuture("createIdPool failed: " + input.toString(), e);
198         } finally {
199             idUtils.unlock(lockManager, poolName);
200         }
201         return futureResult;
202     }
203
204     @Override
205     public Future<RpcResult<AllocateIdOutput>> allocateId(AllocateIdInput input) {
206         if (LOG.isDebugEnabled()) {
207             LOG.debug("AllocateId called with input {}", input);
208         }
209         String idKey = input.getIdKey();
210         String poolName = input.getPoolName();
211         String localPoolName = idUtils.getLocalPoolName(poolName);
212         long newIdValue = -1;
213         AllocateIdOutputBuilder output = new AllocateIdOutputBuilder();
214         Future<RpcResult<AllocateIdOutput>> futureResult;
215         String uniqueKey = idUtils.getUniqueKey(poolName, idKey);
216         try {
217             idUtils.lock(lockManager, uniqueKey);
218             //allocateIdFromLocalPool method returns a list of IDs with one element. This element is obtained by get(0)
219             newIdValue = allocateIdFromLocalPool(poolName, localPoolName, idKey, 1).get(0);
220             output.setIdValue(newIdValue);
221             futureResult = RpcResultBuilder.<AllocateIdOutput>success().withResult(output.build()).buildFuture();
222         } catch (OperationFailedException | IdManagerException e) {
223             java.util.Optional.ofNullable(
224                     idUtils.allocatedIdMap.remove(idUtils.getUniqueKey(poolName, idKey)))
225                     .ifPresent(futureId -> futureId.completeExceptionally(e));
226             futureResult = buildFailedRpcResultFuture("allocateId failed: " + input.toString(), e);
227             idUtils.unlock(lockManager, uniqueKey);
228         }
229         return futureResult;
230     }
231
232     @Override
233     public Future<RpcResult<AllocateIdRangeOutput>> allocateIdRange(AllocateIdRangeInput input) {
234         if (LOG.isDebugEnabled()) {
235             LOG.debug("AllocateIdRange called with input {}", input);
236         }
237         String idKey = input.getIdKey();
238         String poolName = input.getPoolName();
239         long size = input.getSize();
240         String localPoolName = idUtils.getLocalPoolName(poolName);
241         List<Long> newIdValuesList = new ArrayList<>();
242         AllocateIdRangeOutputBuilder output = new AllocateIdRangeOutputBuilder();
243         Future<RpcResult<AllocateIdRangeOutput>> futureResult;
244         String uniqueKey = idUtils.getUniqueKey(poolName, idKey);
245         try {
246             idUtils.lock(lockManager, uniqueKey);
247             newIdValuesList = allocateIdFromLocalPool(poolName, localPoolName, idKey, size);
248             Collections.sort(newIdValuesList);
249             output.setIdValues(newIdValuesList);
250             futureResult = RpcResultBuilder.<AllocateIdRangeOutput>success().withResult(output.build()).buildFuture();
251         } catch (OperationFailedException | IdManagerException e) {
252             java.util.Optional.ofNullable(
253                     idUtils.allocatedIdMap.remove(idUtils.getUniqueKey(poolName, idKey)))
254                     .ifPresent(futureId -> futureId.completeExceptionally(e));
255             futureResult = buildFailedRpcResultFuture("allocateIdRange failed: " + input.toString(), e);
256             idUtils.unlock(lockManager, uniqueKey);
257         }
258         return futureResult;
259     }
260
261     @Override
262     public Future<RpcResult<Void>> deleteIdPool(DeleteIdPoolInput input) {
263         if (LOG.isDebugEnabled()) {
264             LOG.debug("DeleteIdPool called with input {}", input);
265         }
266         String poolName = input.getPoolName();
267         Future<RpcResult<Void>> futureResult;
268         try {
269             InstanceIdentifier<IdPool> idPoolToBeDeleted = idUtils.getIdPoolInstance(poolName);
270             poolName = poolName.intern();
271             synchronized (poolName) {
272                 IdPool idPool = singleTxDB.syncRead(CONFIGURATION, idPoolToBeDeleted);
273                 List<ChildPools> childPoolList = idPool.getChildPools();
274                 if (childPoolList != null) {
275                     childPoolList.parallelStream().forEach(childPool -> deletePool(childPool.getChildPoolName()));
276                 }
277                 singleTxDB.syncDelete(CONFIGURATION, idPoolToBeDeleted);
278                 if (LOG.isDebugEnabled()) {
279                     LOG.debug("Deleted id pool {}", poolName);
280                 }
281             }
282             futureResult = RpcResultBuilder.<Void>success().buildFuture();
283         } catch (OperationFailedException e) {
284             futureResult = buildFailedRpcResultFuture("deleteIdPool failed: " + input.toString(), e);
285         }
286         return futureResult;
287     }
288
289     @Override
290     public Future<RpcResult<Void>> releaseId(ReleaseIdInput input) {
291         String poolName = input.getPoolName();
292         String idKey = input.getIdKey();
293         Future<RpcResult<Void>> futureResult;
294         String uniqueKey = idUtils.getUniqueKey(poolName, idKey);
295         try {
296             idUtils.lock(lockManager, uniqueKey);
297             releaseIdFromLocalPool(poolName, idUtils.getLocalPoolName(poolName), idKey);
298             futureResult = RpcResultBuilder.<Void>success().buildFuture();
299         } catch (ReadFailedException | IdManagerException e) {
300             futureResult = buildFailedRpcResultFuture("releaseId failed: " + input.toString(), e);
301             idUtils.unlock(lockManager, uniqueKey);
302         }
303         return futureResult;
304     }
305
306     private <T> ListenableFuture<RpcResult<T>> buildFailedRpcResultFuture(String msg, Exception exception) {
307         LOG.error(msg, exception);
308         RpcResultBuilder<T> failedRpcResultBuilder = RpcResultBuilder.failed();
309         failedRpcResultBuilder.withError(ErrorType.APPLICATION, msg, exception);
310         if (exception instanceof OperationFailedException) {
311             failedRpcResultBuilder.withRpcErrors(((OperationFailedException) exception).getErrorList());
312         }
313         return failedRpcResultBuilder.buildFuture();
314     }
315
316     private List<Long> allocateIdFromLocalPool(String parentPoolName, String localPoolName,
317             String idKey, long size) throws OperationFailedException, IdManagerException {
318         if (LOG.isDebugEnabled()) {
319             LOG.debug("Allocating id from local pool {}. Parent pool {}. Idkey {}", localPoolName, parentPoolName,
320                     idKey);
321         }
322         List<Long> newIdValuesList = new ArrayList<>();
323         String uniqueIdKey = idUtils.getUniqueKey(parentPoolName, idKey);
324         CompletableFuture<List<Long>> futureIdValues = new CompletableFuture<>();
325         CompletableFuture<List<Long>> existingFutureIdValue =
326                 idUtils.allocatedIdMap.putIfAbsent(uniqueIdKey, futureIdValues);
327         if (existingFutureIdValue != null) {
328             try {
329                 newIdValuesList = existingFutureIdValue.get();
330                 return newIdValuesList;
331             } catch (InterruptedException | ExecutionException e) {
332                 LOG.warn("Could not obtain id from existing futureIdValue for idKey {} and pool {}.",
333                         idKey, parentPoolName);
334                 throw new IdManagerException(e.getMessage(), e);
335             }
336         }
337         long newIdValue = -1;
338         localPoolName = localPoolName.intern();
339         InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
340         InstanceIdentifier<IdEntries> existingId = idUtils.getIdEntry(parentIdPoolInstanceIdentifier, idKey);
341         Optional<IdEntries> existingIdEntry = singleTxDB.syncReadOptional(CONFIGURATION, existingId);
342         if (existingIdEntry.isPresent()) {
343             newIdValuesList = existingIdEntry.get().getIdValue();
344             if (LOG.isDebugEnabled()) {
345                 LOG.debug("Existing ids {} for the key {} ", newIdValuesList, idKey);
346             }
347             // Inform other waiting threads about this new value.
348             futureIdValues.complete(newIdValuesList);
349             // This is to avoid stale entries in the map. If this thread had populated the map,
350             // then the entry should be removed.
351             if (existingFutureIdValue == null) {
352                 idUtils.allocatedIdMap.remove(uniqueIdKey);
353             }
354             idUtils.unlock(lockManager, uniqueIdKey);
355             return newIdValuesList;
356         }
357         //This get will not help in concurrent reads. Hence the same read needs to be done again.
358         IdLocalPool localIdPool = localPool.get(parentPoolName);
359         if (localIdPool == null) {
360             idUtils.lock(lockManager, parentPoolName);
361             try {
362                 //Check if a previous thread that got the cluster-wide lock first, has created the localPool
363                 if (localPool.get(parentPoolName) == null) {
364                     WriteTransaction tx = broker.newWriteOnlyTransaction();
365                     IdPool parentIdPool = singleTxDB.syncRead(CONFIGURATION, parentIdPoolInstanceIdentifier);
366                     localIdPool = createLocalPool(tx, localPoolName, parentIdPool); // Return localIdPool.....
367                     tx.submit().checkedGet();
368                 } else {
369                     localIdPool = localPool.get(parentPoolName);
370                 }
371             } finally {
372                 idUtils.unlock(lockManager, parentPoolName);
373             }
374         }
375         if (LOG.isDebugEnabled()) {
376             LOG.debug("Got pool {}", localIdPool);
377         }
378         if (size == 1) {
379             newIdValue = getIdFromLocalPoolCache(localIdPool, parentPoolName);
380             newIdValuesList.add(newIdValue);
381         } else {
382             IdPool parentIdPool = singleTxDB.syncRead(CONFIGURATION, parentIdPoolInstanceIdentifier);
383             long totalAvailableIdCount = localIdPool.getAvailableIds().getAvailableIdCount()
384                     + localIdPool.getReleasedIds().getAvailableIdCount();
385             AvailableIdsHolderBuilder availableParentIds = idUtils.getAvailableIdsHolderBuilder(parentIdPool);
386             ReleasedIdsHolderBuilder releasedParentIds = idUtils.getReleaseIdsHolderBuilder(parentIdPool);
387             totalAvailableIdCount = totalAvailableIdCount + releasedParentIds.getAvailableIdCount()
388                     + idUtils.getAvailableIdsCount(availableParentIds);
389             if (totalAvailableIdCount > size) {
390                 while (size > 0) {
391                     try {
392                         newIdValue = getIdFromLocalPoolCache(localIdPool, parentPoolName);
393                     } catch (OperationFailedException e) {
394                         if (LOG.isDebugEnabled()) {
395                             LOG.debug("Releasing IDs to pool {}", localPoolName);
396                         }
397                         // Releasing the IDs added in newIdValuesList since a null list would be returned now, as the
398                         // requested size of list IDs exceeds the number of available IDs.
399                         updateDelayedEntriesInLocalCache(newIdValuesList, parentPoolName, localIdPool);
400                     }
401                     newIdValuesList.add(newIdValue);
402                     size--;
403                 }
404             } else {
405                 throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentPoolName));
406             }
407         }
408         if (LOG.isDebugEnabled()) {
409             LOG.debug("The newIdValues {} for the idKey {}", newIdValuesList, idKey);
410         }
411         idUtils.releaseIdLatchMap.put(uniqueIdKey, new CountDownLatch(1));
412         UpdateIdEntryJob job = new UpdateIdEntryJob(parentPoolName, localPoolName, idKey, newIdValuesList, broker,
413                 idUtils, lockManager);
414         DataStoreJobCoordinator.getInstance().enqueueJob(parentPoolName, job, IdUtils.RETRY_COUNT);
415         futureIdValues.complete(newIdValuesList);
416         return newIdValuesList;
417     }
418
419     private Long getIdFromLocalPoolCache(IdLocalPool localIdPool, String parentPoolName)
420             throws OperationFailedException, IdManagerException {
421         while (true) {
422             IdHolder releasedIds = localIdPool.getReleasedIds();
423             Optional<Long> releasedId = Optional.absent();
424             releasedId = releasedIds.allocateId();
425             if (releasedId.isPresent()) {
426                 IdHolderSyncJob poolSyncJob =
427                         new IdHolderSyncJob(localIdPool.getPoolName(), localIdPool.getReleasedIds(), broker,
428                                 idUtils);
429                 DataStoreJobCoordinator.getInstance().enqueueJob(localIdPool.getPoolName(),
430                         poolSyncJob, IdUtils.RETRY_COUNT);
431                 return releasedId.get();
432             }
433             Optional<Long> availableId = Optional.absent();
434             IdHolder availableIds = localIdPool.getAvailableIds();
435             if (availableIds != null) {
436                 availableId = availableIds.allocateId();
437                 if (availableId.isPresent()) {
438                     IdHolderSyncJob poolSyncJob =
439                             new IdHolderSyncJob(localIdPool.getPoolName(), localIdPool.getAvailableIds(),
440                                     broker, idUtils);
441                     DataStoreJobCoordinator.getInstance().enqueueJob(localIdPool.getPoolName(),
442                             poolSyncJob, IdUtils.RETRY_COUNT);
443                     return availableId.get();
444                 }
445             }
446             long idCount = getIdBlockFromParentPool(parentPoolName, localIdPool);
447             if (idCount <= 0) {
448                 if (LOG.isDebugEnabled()) {
449                     LOG.debug("Unable to allocate Id block from global pool");
450                 }
451                 throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentPoolName));
452             }
453         }
454     }
455
456     /**
457      * Changes made to availableIds and releasedIds will not be persisted to the datastore.
458      */
459     private long getIdBlockFromParentPool(String parentPoolName, IdLocalPool localIdPool)
460             throws OperationFailedException, IdManagerException {
461         if (LOG.isDebugEnabled()) {
462             LOG.debug("Allocating block of id from parent pool {}", parentPoolName);
463         }
464         InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
465         parentPoolName = parentPoolName.intern();
466         idUtils.lock(lockManager, parentPoolName);
467         long idCount = 0;
468         try {
469             // Check if the childpool already got id block.
470             long availableIdCount =
471                     localIdPool.getAvailableIds().getAvailableIdCount()
472                             + localIdPool.getReleasedIds().getAvailableIdCount();
473             if (availableIdCount > 0) {
474                 return availableIdCount;
475             }
476             WriteTransaction tx = broker.newWriteOnlyTransaction();
477             IdPool parentIdPool = singleTxDB.syncRead(CONFIGURATION, idPoolInstanceIdentifier);
478             idCount = allocateIdBlockFromParentPool(localIdPool, parentIdPool, tx);
479             tx.submit().checkedGet();
480         } catch (IdManagerException | NullPointerException e) {
481             LOG.error("Error getting id block from parent pool. {}", e.getMessage());
482         } finally {
483             idUtils.unlock(lockManager, parentPoolName);
484         }
485         return idCount;
486     }
487
488     private long allocateIdBlockFromParentPool(IdLocalPool localPoolCache, IdPool parentIdPool, WriteTransaction tx)
489             throws OperationFailedException, IdManagerException {
490         long idCount = -1;
491         ReleasedIdsHolderBuilder releasedIdsBuilderParent = idUtils.getReleaseIdsHolderBuilder(parentIdPool);
492         while (true) {
493             idCount = allocateIdBlockFromReleasedIdsHolder(localPoolCache, releasedIdsBuilderParent, parentIdPool, tx);
494             if (idCount > 0) {
495                 return idCount;
496             }
497             idCount = allocateIdBlockFromAvailableIdsHolder(localPoolCache, parentIdPool, tx);
498             if (idCount > 0) {
499                 return idCount;
500             }
501             idCount = getIdsFromOtherChildPools(releasedIdsBuilderParent, parentIdPool);
502             if (idCount <= 0) {
503                 if (LOG.isDebugEnabled()) {
504                     LOG.debug("Unable to allocate Id block from global pool");
505                 }
506                 throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentIdPool.getPoolName()));
507             }
508         }
509     }
510
511     private long getIdsFromOtherChildPools(ReleasedIdsHolderBuilder releasedIdsBuilderParent, IdPool parentIdPool)
512             throws OperationFailedException {
513         List<ChildPools> childPoolsList = parentIdPool.getChildPools();
514         // Sorting the child pools on last accessed time so that the pool that
515         // was not accessed for a long time comes first.
516         Collections.sort(childPoolsList,
517             (childPool1, childPool2) -> childPool1.getLastAccessTime().compareTo(childPool2.getLastAccessTime()));
518         long currentTime = System.currentTimeMillis() / 1000;
519         for (ChildPools childPools : childPoolsList) {
520             if (childPools.getLastAccessTime() + DEFAULT_IDLE_TIME > currentTime) {
521                 break;
522             }
523             if (!childPools.getChildPoolName().equals(idUtils.getLocalPoolName(parentIdPool.getPoolName()))) {
524                 InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils
525                         .getIdPoolInstance(childPools.getChildPoolName());
526                 IdPool otherChildPool = singleTxDB.syncRead(CONFIGURATION, idPoolInstanceIdentifier);
527                 ReleasedIdsHolderBuilder releasedIds = idUtils.getReleaseIdsHolderBuilder(otherChildPool);
528
529                 List<DelayedIdEntries> delayedIdEntriesChild = releasedIds.getDelayedIdEntries();
530                 List<DelayedIdEntries> delayedIdEntriesParent = releasedIdsBuilderParent.getDelayedIdEntries();
531                 if (delayedIdEntriesParent == null) {
532                     delayedIdEntriesParent = new LinkedList<>();
533                 }
534                 delayedIdEntriesParent.addAll(delayedIdEntriesChild);
535                 delayedIdEntriesChild.removeAll(delayedIdEntriesChild);
536
537                 AvailableIdsHolderBuilder availableIds = idUtils.getAvailableIdsHolderBuilder(otherChildPool);
538                 while (idUtils.isIdAvailable(availableIds)) {
539                     long cursor = availableIds.getCursor() + 1;
540                     delayedIdEntriesParent.add(idUtils.createDelayedIdEntry(cursor, currentTime));
541                     availableIds.setCursor(cursor);
542                 }
543
544                 long totalAvailableIdCount = releasedIds.getDelayedIdEntries().size()
545                         + idUtils.getAvailableIdsCount(availableIds);
546                 long count = releasedIdsBuilderParent.getAvailableIdCount() + totalAvailableIdCount;
547                 releasedIdsBuilderParent.setDelayedIdEntries(delayedIdEntriesParent).setAvailableIdCount(count);
548                 singleTxDB.syncUpdate(CONFIGURATION, idPoolInstanceIdentifier,
549                         new IdPoolBuilder().setKey(new IdPoolKey(otherChildPool.getPoolName()))
550                                 .setAvailableIdsHolder(availableIds.build()).setReleasedIdsHolder(releasedIds.build())
551                                 .build());
552                 return totalAvailableIdCount;
553             }
554         }
555         return 0;
556     }
557
558     private long allocateIdBlockFromReleasedIdsHolder(IdLocalPool localIdPool,
559             ReleasedIdsHolderBuilder releasedIdsBuilderParent, IdPool parentIdPool, WriteTransaction tx) {
560         if (releasedIdsBuilderParent.getAvailableIdCount() == 0) {
561             if (LOG.isDebugEnabled()) {
562                 LOG.debug("Ids unavailable in releasedIds of parent pool {}", parentIdPool);
563             }
564             return 0;
565         }
566         List<DelayedIdEntries> delayedIdEntriesParent = releasedIdsBuilderParent.getDelayedIdEntries();
567         int idCount = Math.min(delayedIdEntriesParent.size(), parentIdPool.getBlockSize());
568         List<DelayedIdEntries> idEntriesToBeRemoved = delayedIdEntriesParent.subList(0, idCount);
569         ReleasedIdHolder releasedIds = (ReleasedIdHolder) localIdPool.getReleasedIds();
570         List<DelayedIdEntry> delayedIdEntriesLocalCache = releasedIds.getDelayedEntries();
571         delayedIdEntriesLocalCache = idEntriesToBeRemoved
572                 .parallelStream()
573                 .map(delayedIdEntry -> new DelayedIdEntry(delayedIdEntry
574                         .getId(), delayedIdEntry.getReadyTimeSec()))
575                 .sorted((idEntry1, idEntry2) -> Long.compare(idEntry1.getReadyTimeSec(),
576                         idEntry2.getReadyTimeSec())).collect(Collectors.toList());
577         releasedIds.setDelayedEntries(delayedIdEntriesLocalCache);
578         releasedIds.setAvailableIdCount(releasedIds.getAvailableIdCount() + idCount);
579         localIdPool.setReleasedIds(releasedIds);
580         delayedIdEntriesParent.removeAll(idEntriesToBeRemoved);
581         releasedIdsBuilderParent.setDelayedIdEntries(delayedIdEntriesParent);
582         InstanceIdentifier<ReleasedIdsHolder> releasedIdsHolderInstanceIdentifier = InstanceIdentifier
583                 .builder(IdPools.class).child(IdPool.class,
584                         new IdPoolKey(parentIdPool.getPoolName())).child(ReleasedIdsHolder.class).build();
585         releasedIdsBuilderParent.setAvailableIdCount(releasedIdsBuilderParent.getAvailableIdCount() - idCount);
586         if (LOG.isDebugEnabled()) {
587             LOG.debug("Allocated {} ids from releasedIds of parent pool {}", idCount, parentIdPool);
588         }
589         tx.merge(CONFIGURATION, releasedIdsHolderInstanceIdentifier,
590                 releasedIdsBuilderParent.build(), true);
591         return idCount;
592     }
593
594     private long allocateIdBlockFromAvailableIdsHolder(IdLocalPool localIdPool, IdPool parentIdPool,
595             WriteTransaction tx) {
596         long idCount = 0;
597         AvailableIdsHolderBuilder availableIdsBuilderParent = idUtils.getAvailableIdsHolderBuilder(parentIdPool);
598         long end = availableIdsBuilderParent.getEnd();
599         long cur = availableIdsBuilderParent.getCursor();
600         if (!idUtils.isIdAvailable(availableIdsBuilderParent)) {
601             if (LOG.isDebugEnabled()) {
602                 LOG.debug("Ids exhausted in parent pool {}", parentIdPool);
603             }
604             return idCount;
605         }
606         // Update availableIdsHolder of Local Pool
607         idCount = Math.min(end - cur, parentIdPool.getBlockSize());
608         AvailableIdHolder availableIds = new AvailableIdHolder(idUtils, cur + 1, cur + idCount);
609         localIdPool.setAvailableIds(availableIds);
610         // Update availableIdsHolder of Global Pool
611         InstanceIdentifier<AvailableIdsHolder> availableIdsHolderInstanceIdentifier = InstanceIdentifier
612                 .builder(IdPools.class).child(IdPool.class,
613                         new IdPoolKey(parentIdPool.getPoolName())).child(AvailableIdsHolder.class).build();
614         availableIdsBuilderParent.setCursor(cur + idCount);
615         if (LOG.isDebugEnabled()) {
616             LOG.debug("Allocated {} ids from availableIds of global pool {}", idCount, parentIdPool);
617         }
618         tx.merge(CONFIGURATION, availableIdsHolderInstanceIdentifier,
619                 availableIdsBuilderParent.build(), true);
620         return idCount;
621     }
622
623     private void releaseIdFromLocalPool(String parentPoolName, String localPoolName, String idKey)
624             throws ReadFailedException, IdManagerException {
625         String idLatchKey = idUtils.getUniqueKey(parentPoolName, idKey);
626         if (LOG.isDebugEnabled()) {
627             LOG.debug("Releasing ID {} from pool {}", idKey, localPoolName);
628         }
629         java.util.Optional.ofNullable(idUtils.releaseIdLatchMap.get(idLatchKey)).ifPresent(latch -> {
630             try {
631                 latch.await(10, TimeUnit.SECONDS);
632             } catch (InterruptedException ignored) {
633                 LOG.warn("Thread interrupted while releasing id {} from id pool {}", idKey, parentPoolName);
634             } finally {
635                 idUtils.releaseIdLatchMap.remove(idLatchKey);
636             }
637         });
638         localPoolName = localPoolName.intern();
639         InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
640         IdPool parentIdPool = singleTxDB.syncRead(CONFIGURATION, parentIdPoolInstanceIdentifier);
641         List<IdEntries> idEntries = parentIdPool.getIdEntries();
642         List<IdEntries> newIdEntries = idEntries;
643         if (idEntries == null) {
644             throw new IdManagerException("Id Entries does not exist");
645         }
646         InstanceIdentifier<IdEntries> existingId = idUtils.getIdEntry(parentIdPoolInstanceIdentifier, idKey);
647         Optional<IdEntries> existingIdEntryObject = singleTxDB.syncReadOptional(CONFIGURATION, existingId);
648         if (!existingIdEntryObject.isPresent()) {
649             if (LOG.isDebugEnabled()) {
650                 LOG.debug("Specified Id key {} does not exist in id pool {}", idKey, parentPoolName);
651             }
652             idUtils.unlock(lockManager, idLatchKey);
653             return;
654         }
655         IdEntries existingIdEntry = existingIdEntryObject.get();
656         List<Long> idValuesList = existingIdEntry.getIdValue();
657         IdLocalPool localIdPoolCache = localPool.get(parentPoolName);
658         boolean isRemoved = newIdEntries.remove(existingIdEntry);
659         if (LOG.isDebugEnabled()) {
660             LOG.debug("The entry {} is removed {}", existingIdEntry, isRemoved);
661         }
662         updateDelayedEntriesInLocalCache(idValuesList, parentPoolName, localIdPoolCache);
663         IdHolderSyncJob poolSyncJob = new IdHolderSyncJob(localPoolName, localIdPoolCache.getReleasedIds(), broker,
664                 idUtils);
665         DataStoreJobCoordinator.getInstance().enqueueJob(localPoolName, poolSyncJob, IdUtils.RETRY_COUNT);
666         scheduleCleanUpTask(localIdPoolCache, parentPoolName, parentIdPool.getBlockSize());
667         if (LOG.isDebugEnabled()) {
668             LOG.debug("Released id ({}, {}) from pool {}", idKey, idValuesList, localPoolName);
669         }
670         // Updating id entries in the parent pool. This will be used for restart scenario
671         UpdateIdEntryJob job = new UpdateIdEntryJob(parentPoolName, localPoolName, idKey, null, broker, idUtils,
672                         lockManager);
673         DataStoreJobCoordinator.getInstance().enqueueJob(parentPoolName, job, IdUtils.RETRY_COUNT);
674     }
675
676     private void scheduleCleanUpTask(final IdLocalPool localIdPoolCache,
677             final String parentPoolName, final int blockSize) {
678         TimerTask scheduledTask = new TimerTask() {
679             @Override
680             public void run() {
681                 CleanUpJob job = new CleanUpJob(localIdPoolCache, broker, parentPoolName, blockSize, lockManager,
682                         idUtils);
683                 DataStoreJobCoordinator.getInstance().enqueueJob(localIdPoolCache.getPoolName(), job,
684                         IdUtils.RETRY_COUNT);
685             }
686         };
687         cleanJobTimer.schedule(scheduledTask, IdUtils.DEFAULT_DELAY_TIME * 1000);
688     }
689
690     private IdPool createGlobalPool(WriteTransaction tx, String poolName, long low, long high, long blockSize)
691             throws ReadFailedException {
692         IdPool idPool;
693         InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils.getIdPoolInstance(poolName);
694         Optional<IdPool> existingIdPool = singleTxDB.syncReadOptional(CONFIGURATION, idPoolInstanceIdentifier);
695         if (!existingIdPool.isPresent()) {
696             if (LOG.isDebugEnabled()) {
697                 LOG.debug("Creating new global pool {}", poolName);
698             }
699             idPool = idUtils.createGlobalPool(poolName, low, high, blockSize);
700             tx.put(CONFIGURATION, idPoolInstanceIdentifier, idPool, true);
701         } else {
702             idPool = existingIdPool.get();
703             if (LOG.isDebugEnabled()) {
704                 LOG.debug("GlobalPool exists {}", idPool);
705             }
706         }
707         return idPool;
708     }
709
710     private IdLocalPool createLocalPool(WriteTransaction tx, String localPoolName, IdPool idPool)
711             throws OperationFailedException, IdManagerException {
712         localPoolName = localPoolName.intern();
713         IdLocalPool idLocalPool = new IdLocalPool(idUtils, localPoolName);
714         allocateIdBlockFromParentPool(idLocalPool, idPool, tx);
715         String parentPool = idPool.getPoolName();
716         localPool.put(parentPool, idLocalPool);
717         LocalPoolCreateJob job = new LocalPoolCreateJob(idLocalPool, broker, idPool.getPoolName(),
718                 idPool.getBlockSize(), idUtils);
719         DataStoreJobCoordinator.getInstance().enqueueJob(localPoolName, job, IdUtils.RETRY_COUNT);
720         return idLocalPool;
721     }
722
723     private void deletePool(String poolName) {
724         LocalPoolDeleteJob job = new LocalPoolDeleteJob(poolName, broker, idUtils);
725         DataStoreJobCoordinator.getInstance().enqueueJob(poolName, job, IdUtils.RETRY_COUNT);
726     }
727
728     public void poolDeleted(String parentPoolName, String poolName) {
729         IdLocalPool idLocalPool = localPool.get(parentPoolName);
730         if (idLocalPool != null) {
731             if (idLocalPool.getPoolName().equals(poolName)) {
732                 localPool.remove(parentPoolName);
733             }
734         }
735     }
736
737     private void updateDelayedEntriesInLocalCache(List<Long> idsList, String parentPoolName,
738             IdLocalPool localPoolCache) {
739         for (long idValue : idsList) {
740             localPoolCache.getReleasedIds().addId(idValue);
741         }
742         localPool.put(parentPoolName, localPoolCache);
743     }
744
745     public java.util.Optional<IdLocalPool> getIdLocalPool(String parentPoolName) {
746         return java.util.Optional.ofNullable(localPool.get(parentPoolName))
747                 .map(localPool -> localPool.deepCopyOf());
748     }
749
750 }