2 * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.genius.idmanager;
11 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
13 import com.google.common.base.Optional;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import java.util.ArrayList;
16 import java.util.Collections;
17 import java.util.HashMap;
18 import java.util.LinkedList;
19 import java.util.List;
21 import java.util.Timer;
22 import java.util.TimerTask;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.Future;
27 import java.util.concurrent.TimeUnit;
28 import java.util.stream.Collectors;
29 import javax.annotation.PostConstruct;
30 import javax.annotation.PreDestroy;
31 import javax.inject.Inject;
32 import javax.inject.Singleton;
33 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
34 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
35 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
36 import org.opendaylight.genius.datastoreutils.DataStoreJobCoordinator;
37 import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
38 import org.opendaylight.genius.idmanager.ReleasedIdHolder.DelayedIdEntry;
39 import org.opendaylight.genius.idmanager.api.IdManagerMonitor;
40 import org.opendaylight.genius.idmanager.jobs.CleanUpJob;
41 import org.opendaylight.genius.idmanager.jobs.IdHolderSyncJob;
42 import org.opendaylight.genius.idmanager.jobs.LocalPoolCreateJob;
43 import org.opendaylight.genius.idmanager.jobs.LocalPoolDeleteJob;
44 import org.opendaylight.genius.idmanager.jobs.UpdateIdEntryJob;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdInput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdOutput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdOutputBuilder;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeInput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeOutput;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeOutputBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolInput;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.DeleteIdPoolInput;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.IdManagerService;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.IdPools;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdInput;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPool;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPoolBuilder;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPoolKey;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.AvailableIdsHolder;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.AvailableIdsHolderBuilder;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ChildPools;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.IdEntries;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ReleasedIdsHolder;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ReleasedIdsHolderBuilder;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.released.ids.DelayedIdEntries;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockManagerService;
67 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
68 import org.opendaylight.yangtools.yang.common.OperationFailedException;
69 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
70 import org.opendaylight.yangtools.yang.common.RpcResult;
71 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
72 import org.slf4j.Logger;
73 import org.slf4j.LoggerFactory;
76 public class IdManager implements IdManagerService, IdManagerMonitor {
78 private static final Logger LOG = LoggerFactory.getLogger(IdManager.class);
79 private static final long DEFAULT_IDLE_TIME = 24 * 60 * 60;
81 private final DataBroker broker;
82 private final SingleTransactionDataBroker singleTxDB;
83 private final LockManagerService lockManager;
84 private final IdUtils idUtils;
86 private final ConcurrentMap<String, IdLocalPool> localPool;
87 private final Timer cleanJobTimer = new Timer();
90 public IdManager(DataBroker db, LockManagerService lockManager, IdUtils idUtils) throws ReadFailedException {
92 this.singleTxDB = new SingleTransactionDataBroker(db);
93 this.lockManager = lockManager;
94 this.idUtils = idUtils;
95 this.localPool = new ConcurrentHashMap<>();
100 public Map<String, String> getLocalPoolsDetails() {
101 Map<String, String> map = new HashMap<>();
102 localPool.entrySet().stream().forEach(entry -> map.put(entry.getKey(), entry.getValue().toString()));
107 public void start() {
108 LOG.info("{} start", getClass().getSimpleName());
112 public void close() throws Exception {
113 LOG.info("{} close", getClass().getSimpleName());
116 private void populateCache() throws ReadFailedException {
117 // If IP changes during reboot, then there will be orphaned child pools.
118 InstanceIdentifier<IdPools> idPoolsInstance = idUtils.getIdPools();
119 Optional<IdPools> idPoolsOptional = singleTxDB.syncReadOptional(CONFIGURATION, idPoolsInstance);
120 if (!idPoolsOptional.isPresent()) {
123 IdPools idPools = idPoolsOptional.get();
124 List<IdPool> idPoolList = idPools.getIdPool();
127 .filter(idPool -> idPool.getParentPoolName() != null
128 && !idPool.getParentPoolName().isEmpty()
129 && idUtils.getLocalPoolName(idPool.getParentPoolName())
130 .equals(idPool.getPoolName()))
132 idPool -> updateLocalIdPoolCache(idPool,
133 idPool.getParentPoolName()));
136 public boolean updateLocalIdPoolCache(IdPool idPool, String parentPoolName) {
137 AvailableIdsHolder availableIdsHolder = idPool.getAvailableIdsHolder();
138 AvailableIdHolder availableIdHolder = new AvailableIdHolder(idUtils, availableIdsHolder.getStart(),
139 availableIdsHolder.getEnd());
140 availableIdHolder.setCur(availableIdsHolder.getCursor());
141 ReleasedIdsHolder releasedIdsHolder = idPool.getReleasedIdsHolder();
142 ReleasedIdHolder releasedIdHolder = new ReleasedIdHolder(idUtils, releasedIdsHolder.getDelayedTimeSec());
143 releasedIdHolder.setAvailableIdCount(releasedIdsHolder.getAvailableIdCount());
144 List<DelayedIdEntries> delayedEntries = releasedIdsHolder.getDelayedIdEntries();
145 List<DelayedIdEntry> delayedIdEntryInCache = new ArrayList<>();
146 if (delayedEntries != null) {
147 delayedIdEntryInCache = delayedEntries
149 .map(delayedIdEntry -> new DelayedIdEntry(delayedIdEntry
150 .getId(), delayedIdEntry.getReadyTimeSec()))
151 .sorted((idEntry1, idEntry2) -> Long.compare(idEntry1.getReadyTimeSec(),
152 idEntry2.getReadyTimeSec())).collect(Collectors.toList());
154 releasedIdHolder.setDelayedEntries(delayedIdEntryInCache);
156 IdLocalPool idLocalPool = new IdLocalPool(idUtils, idPool.getPoolName());
157 idLocalPool.setAvailableIds(availableIdHolder);
158 idLocalPool.setReleasedIds(releasedIdHolder);
159 localPool.put(parentPoolName, idLocalPool);
160 if (LOG.isDebugEnabled()) {
161 LOG.debug("Populating cache for {} with {}", idLocalPool.getPoolName(), idLocalPool);
167 public Future<RpcResult<Void>> createIdPool(CreateIdPoolInput input) {
168 if (LOG.isDebugEnabled()) {
169 LOG.debug("createIdPool called with input {}", input);
171 String poolName = input.getPoolName();
172 long low = input.getLow();
173 long high = input.getHigh();
174 long blockSize = idUtils.computeBlockSize(low, high);
175 Future<RpcResult<Void>> futureResult;
177 idUtils.lockPool(lockManager, poolName);
178 WriteTransaction tx = broker.newWriteOnlyTransaction();
179 poolName = poolName.intern();
181 idPool = createGlobalPool(tx, poolName, low, high, blockSize);
182 String localPoolName = idUtils.getLocalPoolName(poolName);
183 IdLocalPool idLocalPool = localPool.get(poolName);
184 if (idLocalPool == null) {
185 createLocalPool(tx, localPoolName, idPool);
186 idUtils.updateChildPool(tx, idPool.getPoolName(), localPoolName);
188 tx.submit().checkedGet();
189 futureResult = RpcResultBuilder.<Void>success().buildFuture();
190 } catch (OperationFailedException | IdManagerException e) {
191 futureResult = buildFailedRpcResultFuture("createIdPool failed: " + input.toString(), e);
194 idUtils.unlockPool(lockManager, poolName);
195 } catch (IdManagerException e) {
196 futureResult = buildFailedRpcResultFuture("createIdPool unlockPool() failed: " + input.toString(), e);
203 public Future<RpcResult<AllocateIdOutput>> allocateId(AllocateIdInput input) {
204 if (LOG.isDebugEnabled()) {
205 LOG.debug("AllocateId called with input {}", input);
207 String idKey = input.getIdKey();
208 String poolName = input.getPoolName();
209 String localPoolName = idUtils.getLocalPoolName(poolName);
210 long newIdValue = -1;
211 AllocateIdOutputBuilder output = new AllocateIdOutputBuilder();
212 Future<RpcResult<AllocateIdOutput>> futureResult;
214 //allocateIdFromLocalPool method returns a list of IDs with one element. This element is obtained by get(0)
215 newIdValue = allocateIdFromLocalPool(poolName, localPoolName, idKey, 1).get(0);
216 output.setIdValue(newIdValue);
217 futureResult = RpcResultBuilder.<AllocateIdOutput>success().withResult(output.build()).buildFuture();
218 } catch (OperationFailedException | IdManagerException e) {
219 futureResult = buildFailedRpcResultFuture("allocateId failed: " + input.toString(), e);
225 public Future<RpcResult<AllocateIdRangeOutput>> allocateIdRange(AllocateIdRangeInput input) {
226 if (LOG.isDebugEnabled()) {
227 LOG.debug("AllocateIdRange called with input {}", input);
229 String idKey = input.getIdKey();
230 String poolName = input.getPoolName();
231 long size = input.getSize();
232 String localPoolName = idUtils.getLocalPoolName(poolName);
233 List<Long> newIdValuesList = new ArrayList<>();
234 AllocateIdRangeOutputBuilder output = new AllocateIdRangeOutputBuilder();
235 Future<RpcResult<AllocateIdRangeOutput>> futureResult;
237 newIdValuesList = allocateIdFromLocalPool(poolName, localPoolName, idKey, size);
238 Collections.sort(newIdValuesList);
239 output.setIdValues(newIdValuesList);
240 futureResult = RpcResultBuilder.<AllocateIdRangeOutput>success().withResult(output.build()).buildFuture();
241 } catch (OperationFailedException | IdManagerException e) {
242 futureResult = buildFailedRpcResultFuture("allocateIdRange failed: " + input.toString(), e);
248 public Future<RpcResult<Void>> deleteIdPool(DeleteIdPoolInput input) {
249 if (LOG.isDebugEnabled()) {
250 LOG.debug("DeleteIdPool called with input {}", input);
252 String poolName = input.getPoolName();
253 Future<RpcResult<Void>> futureResult;
255 InstanceIdentifier<IdPool> idPoolToBeDeleted = idUtils.getIdPoolInstance(poolName);
256 poolName = poolName.intern();
257 synchronized (poolName) {
258 IdPool idPool = singleTxDB.syncRead(CONFIGURATION, idPoolToBeDeleted);
259 List<ChildPools> childPoolList = idPool.getChildPools();
260 if (childPoolList != null) {
261 childPoolList.parallelStream().forEach(childPool -> deletePool(childPool.getChildPoolName()));
263 singleTxDB.syncDelete(CONFIGURATION, idPoolToBeDeleted);
264 if (LOG.isDebugEnabled()) {
265 LOG.debug("Deleted id pool {}", poolName);
268 futureResult = RpcResultBuilder.<Void>success().buildFuture();
269 } catch (OperationFailedException e) {
270 futureResult = buildFailedRpcResultFuture("deleteIdPool failed: " + input.toString(), e);
276 public Future<RpcResult<Void>> releaseId(ReleaseIdInput input) {
277 String poolName = input.getPoolName();
278 String idKey = input.getIdKey();
279 Future<RpcResult<Void>> futureResult;
281 releaseIdFromLocalPool(poolName, idUtils.getLocalPoolName(poolName), idKey);
282 futureResult = RpcResultBuilder.<Void>success().buildFuture();
283 } catch (ReadFailedException | IdManagerException e) {
284 futureResult = buildFailedRpcResultFuture("releaseId failed: " + input.toString(), e);
289 private <T> ListenableFuture<RpcResult<T>> buildFailedRpcResultFuture(String msg, Exception exception) {
290 LOG.error(msg, exception);
291 RpcResultBuilder<T> failedRpcResultBuilder = RpcResultBuilder.failed();
292 failedRpcResultBuilder.withError(ErrorType.APPLICATION, msg, exception);
293 if (exception instanceof OperationFailedException) {
294 failedRpcResultBuilder.withRpcErrors(((OperationFailedException) exception).getErrorList());
296 return failedRpcResultBuilder.buildFuture();
299 private List<Long> allocateIdFromLocalPool(String parentPoolName, String localPoolName, String idKey, long size)
300 throws OperationFailedException, IdManagerException {
301 if (LOG.isDebugEnabled()) {
302 LOG.debug("Allocating id from local pool {}. Parent pool {}. Idkey {}", localPoolName, parentPoolName,
305 long newIdValue = -1;
306 List<Long> newIdValuesList = new ArrayList<>();
307 localPoolName = localPoolName.intern();
308 InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
309 InstanceIdentifier<IdEntries> existingId = idUtils.getIdEntry(parentIdPoolInstanceIdentifier, idKey);
310 Optional<IdEntries> existingIdEntry = singleTxDB.syncReadOptional(CONFIGURATION, existingId);
311 if (existingIdEntry.isPresent()) {
312 newIdValuesList = existingIdEntry.get().getIdValue();
313 if (LOG.isDebugEnabled()) {
314 LOG.debug("Existing ids {} for the key {} ", newIdValuesList, idKey);
316 return newIdValuesList;
318 //This get will not help in concurrent reads. Hence the same read needs to be done again.
319 IdLocalPool localIdPool = localPool.get(parentPoolName);
320 if (localIdPool == null) {
321 idUtils.lockPool(lockManager, parentPoolName);
323 //Check if a previous thread that got the cluster-wide lock first, has created the localPool
324 if (localPool.get(parentPoolName) == null) {
325 WriteTransaction tx = broker.newWriteOnlyTransaction();
326 IdPool parentIdPool = singleTxDB.syncRead(CONFIGURATION, parentIdPoolInstanceIdentifier);
327 localIdPool = createLocalPool(tx, localPoolName, parentIdPool); // Return localIdPool.....
328 tx.submit().checkedGet();
330 localIdPool = localPool.get(parentPoolName);
333 idUtils.unlockPool(lockManager, parentPoolName);
336 if (LOG.isDebugEnabled()) {
337 LOG.debug("Got pool {}", localIdPool);
340 newIdValue = getIdFromLocalPoolCache(localIdPool, parentPoolName);
341 newIdValuesList.add(newIdValue);
343 IdPool parentIdPool = singleTxDB.syncRead(CONFIGURATION, parentIdPoolInstanceIdentifier);
344 long totalAvailableIdCount = localIdPool.getAvailableIds().getAvailableIdCount()
345 + localIdPool.getReleasedIds().getAvailableIdCount();
346 AvailableIdsHolderBuilder availableParentIds = idUtils.getAvailableIdsHolderBuilder(parentIdPool);
347 ReleasedIdsHolderBuilder releasedParentIds = idUtils.getReleaseIdsHolderBuilder(parentIdPool);
348 totalAvailableIdCount = totalAvailableIdCount + releasedParentIds.getAvailableIdCount()
349 + idUtils.getAvailableIdsCount(availableParentIds);
350 if (totalAvailableIdCount > size) {
353 newIdValue = getIdFromLocalPoolCache(localIdPool, parentPoolName);
354 } catch (OperationFailedException e) {
355 if (LOG.isDebugEnabled()) {
356 LOG.debug("Releasing IDs to pool {}", localPoolName);
358 // Releasing the IDs added in newIdValuesList since a null list would be returned now, as the
359 // requested size of list IDs exceeds the number of available IDs.
360 updateDelayedEntriesInLocalCache(newIdValuesList, parentPoolName, localIdPool);
362 newIdValuesList.add(newIdValue);
366 throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentPoolName));
369 if (LOG.isDebugEnabled()) {
370 LOG.debug("The newIdValues {} for the idKey {}", newIdValuesList, idKey);
372 idUtils.releaseIdLatchMap.put(parentPoolName + idKey, new CountDownLatch(1));
373 UpdateIdEntryJob job = new UpdateIdEntryJob(parentPoolName, localPoolName, idKey, newIdValuesList, broker,
375 DataStoreJobCoordinator.getInstance().enqueueJob(parentPoolName, job, IdUtils.RETRY_COUNT);
376 return newIdValuesList;
379 private Long getIdFromLocalPoolCache(IdLocalPool localIdPool, String parentPoolName)
380 throws OperationFailedException, IdManagerException {
382 IdHolder releasedIds = localIdPool.getReleasedIds();
383 Optional<Long> releasedId = releasedIds.allocateId();
384 if (releasedId.isPresent()) {
385 IdHolderSyncJob poolSyncJob = new IdHolderSyncJob(localIdPool.getPoolName(), releasedIds, broker,
387 DataStoreJobCoordinator.getInstance().enqueueJob(localIdPool.getPoolName(), poolSyncJob,
388 IdUtils.RETRY_COUNT);
389 return releasedId.get();
391 IdHolder availableIds = localIdPool.getAvailableIds();
392 if (availableIds != null) {
393 Optional<Long> availableId = availableIds.allocateId();
394 if (availableId.isPresent()) {
395 IdHolderSyncJob poolSyncJob = new IdHolderSyncJob(localIdPool.getPoolName(), availableIds, broker,
397 DataStoreJobCoordinator.getInstance().enqueueJob(localIdPool.getPoolName(), poolSyncJob,
398 IdUtils.RETRY_COUNT);
399 return availableId.get();
402 long idCount = getIdBlockFromParentPool(parentPoolName, localIdPool);
404 if (LOG.isDebugEnabled()) {
405 LOG.debug("Unable to allocate Id block from global pool");
407 throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentPoolName));
413 * Changes made to availableIds and releasedIds will not be persisted to the datastore.
415 private long getIdBlockFromParentPool(String parentPoolName, IdLocalPool localIdPool)
416 throws OperationFailedException, IdManagerException {
417 if (LOG.isDebugEnabled()) {
418 LOG.debug("Allocating block of id from parent pool {}", parentPoolName);
420 InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
421 parentPoolName = parentPoolName.intern();
422 idUtils.lockPool(lockManager, parentPoolName);
425 // Check if the childpool already got id block.
426 long availableIdCount =
427 localIdPool.getAvailableIds().getAvailableIdCount()
428 + localIdPool.getReleasedIds().getAvailableIdCount();
429 if (availableIdCount > 0) {
430 return availableIdCount;
432 WriteTransaction tx = broker.newWriteOnlyTransaction();
433 IdPool parentIdPool = singleTxDB.syncRead(CONFIGURATION, idPoolInstanceIdentifier);
434 idCount = allocateIdBlockFromParentPool(localIdPool, parentIdPool, tx);
435 tx.submit().checkedGet();
436 } catch (IdManagerException | NullPointerException e) {
437 LOG.error("Error getting id block from parent pool. {}", e.getMessage());
439 idUtils.unlockPool(lockManager, parentPoolName);
444 private long allocateIdBlockFromParentPool(IdLocalPool localPoolCache, IdPool parentIdPool, WriteTransaction tx)
445 throws OperationFailedException, IdManagerException {
447 ReleasedIdsHolderBuilder releasedIdsBuilderParent = idUtils.getReleaseIdsHolderBuilder(parentIdPool);
449 idCount = allocateIdBlockFromReleasedIdsHolder(localPoolCache, releasedIdsBuilderParent, parentIdPool, tx);
453 idCount = allocateIdBlockFromAvailableIdsHolder(localPoolCache, parentIdPool, tx);
457 idCount = getIdsFromOtherChildPools(releasedIdsBuilderParent, parentIdPool);
459 if (LOG.isDebugEnabled()) {
460 LOG.debug("Unable to allocate Id block from global pool");
462 throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentIdPool.getPoolName()));
467 private long getIdsFromOtherChildPools(ReleasedIdsHolderBuilder releasedIdsBuilderParent, IdPool parentIdPool)
468 throws OperationFailedException {
469 List<ChildPools> childPoolsList = parentIdPool.getChildPools();
470 // Sorting the child pools on last accessed time so that the pool that
471 // was not accessed for a long time comes first.
472 Collections.sort(childPoolsList,
473 (childPool1, childPool2) -> childPool1.getLastAccessTime().compareTo(childPool2.getLastAccessTime()));
474 long currentTime = System.currentTimeMillis() / 1000;
475 for (ChildPools childPools : childPoolsList) {
476 if (childPools.getLastAccessTime() + DEFAULT_IDLE_TIME > currentTime) {
479 if (!childPools.getChildPoolName().equals(idUtils.getLocalPoolName(parentIdPool.getPoolName()))) {
480 InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils
481 .getIdPoolInstance(childPools.getChildPoolName());
482 IdPool otherChildPool = singleTxDB.syncRead(CONFIGURATION, idPoolInstanceIdentifier);
483 ReleasedIdsHolderBuilder releasedIds = idUtils.getReleaseIdsHolderBuilder(otherChildPool);
485 List<DelayedIdEntries> delayedIdEntriesChild = releasedIds.getDelayedIdEntries();
486 List<DelayedIdEntries> delayedIdEntriesParent = releasedIdsBuilderParent.getDelayedIdEntries();
487 if (delayedIdEntriesParent == null) {
488 delayedIdEntriesParent = new LinkedList<>();
490 delayedIdEntriesParent.addAll(delayedIdEntriesChild);
491 delayedIdEntriesChild.removeAll(delayedIdEntriesChild);
493 AvailableIdsHolderBuilder availableIds = idUtils.getAvailableIdsHolderBuilder(otherChildPool);
494 while (idUtils.isIdAvailable(availableIds)) {
495 long cursor = availableIds.getCursor() + 1;
496 delayedIdEntriesParent.add(idUtils.createDelayedIdEntry(cursor, currentTime));
497 availableIds.setCursor(cursor);
500 long totalAvailableIdCount = releasedIds.getDelayedIdEntries().size()
501 + idUtils.getAvailableIdsCount(availableIds);
502 long count = releasedIdsBuilderParent.getAvailableIdCount() + totalAvailableIdCount;
503 releasedIdsBuilderParent.setDelayedIdEntries(delayedIdEntriesParent).setAvailableIdCount(count);
504 singleTxDB.syncUpdate(CONFIGURATION, idPoolInstanceIdentifier,
505 new IdPoolBuilder().setKey(new IdPoolKey(otherChildPool.getPoolName()))
506 .setAvailableIdsHolder(availableIds.build()).setReleasedIdsHolder(releasedIds.build())
508 return totalAvailableIdCount;
514 private long allocateIdBlockFromReleasedIdsHolder(IdLocalPool localIdPool,
515 ReleasedIdsHolderBuilder releasedIdsBuilderParent, IdPool parentIdPool, WriteTransaction tx) {
516 if (releasedIdsBuilderParent.getAvailableIdCount() == 0) {
517 if (LOG.isDebugEnabled()) {
518 LOG.debug("Ids unavailable in releasedIds of parent pool {}", parentIdPool);
522 List<DelayedIdEntries> delayedIdEntriesParent = releasedIdsBuilderParent.getDelayedIdEntries();
523 int idCount = Math.min(delayedIdEntriesParent.size(), parentIdPool.getBlockSize());
524 List<DelayedIdEntries> idEntriesToBeRemoved = delayedIdEntriesParent.subList(0, idCount);
525 ReleasedIdHolder releasedIds = (ReleasedIdHolder) localIdPool.getReleasedIds();
526 List<DelayedIdEntry> delayedIdEntriesLocalCache = releasedIds.getDelayedEntries();
527 delayedIdEntriesLocalCache = idEntriesToBeRemoved
529 .map(delayedIdEntry -> new DelayedIdEntry(delayedIdEntry
530 .getId(), delayedIdEntry.getReadyTimeSec()))
531 .sorted((idEntry1, idEntry2) -> Long.compare(idEntry1.getReadyTimeSec(),
532 idEntry2.getReadyTimeSec())).collect(Collectors.toList());
533 releasedIds.setDelayedEntries(delayedIdEntriesLocalCache);
534 releasedIds.setAvailableIdCount(releasedIds.getAvailableIdCount() + idCount);
535 localIdPool.setReleasedIds(releasedIds);
536 delayedIdEntriesParent.removeAll(idEntriesToBeRemoved);
537 releasedIdsBuilderParent.setDelayedIdEntries(delayedIdEntriesParent);
538 InstanceIdentifier<ReleasedIdsHolder> releasedIdsHolderInstanceIdentifier = InstanceIdentifier
539 .builder(IdPools.class).child(IdPool.class,
540 new IdPoolKey(parentIdPool.getPoolName())).child(ReleasedIdsHolder.class).build();
541 releasedIdsBuilderParent.setAvailableIdCount(releasedIdsBuilderParent.getAvailableIdCount() - idCount);
542 if (LOG.isDebugEnabled()) {
543 LOG.debug("Allocated {} ids from releasedIds of parent pool {}", idCount, parentIdPool);
545 tx.merge(CONFIGURATION, releasedIdsHolderInstanceIdentifier,
546 releasedIdsBuilderParent.build(), true);
550 private long allocateIdBlockFromAvailableIdsHolder(IdLocalPool localIdPool, IdPool parentIdPool,
551 WriteTransaction tx) {
553 AvailableIdsHolderBuilder availableIdsBuilderParent = idUtils.getAvailableIdsHolderBuilder(parentIdPool);
554 long end = availableIdsBuilderParent.getEnd();
555 long cur = availableIdsBuilderParent.getCursor();
556 if (!idUtils.isIdAvailable(availableIdsBuilderParent)) {
557 if (LOG.isDebugEnabled()) {
558 LOG.debug("Ids exhausted in parent pool {}", parentIdPool);
562 // Update availableIdsHolder of Local Pool
563 idCount = Math.min(end - cur, parentIdPool.getBlockSize());
564 AvailableIdHolder availableIds = new AvailableIdHolder(idUtils, cur + 1, cur + idCount);
565 localIdPool.setAvailableIds(availableIds);
566 // Update availableIdsHolder of Global Pool
567 InstanceIdentifier<AvailableIdsHolder> availableIdsHolderInstanceIdentifier = InstanceIdentifier
568 .builder(IdPools.class).child(IdPool.class,
569 new IdPoolKey(parentIdPool.getPoolName())).child(AvailableIdsHolder.class).build();
570 availableIdsBuilderParent.setCursor(cur + idCount);
571 if (LOG.isDebugEnabled()) {
572 LOG.debug("Allocated {} ids from availableIds of global pool {}", idCount, parentIdPool);
574 tx.merge(CONFIGURATION, availableIdsHolderInstanceIdentifier,
575 availableIdsBuilderParent.build(), true);
579 private void releaseIdFromLocalPool(String parentPoolName, String localPoolName, String idKey)
580 throws ReadFailedException, IdManagerException {
581 String idLatchKey = parentPoolName + idKey;
582 java.util.Optional.ofNullable(idUtils.releaseIdLatchMap.get(idLatchKey)).ifPresent(latch -> {
584 latch.await(5, TimeUnit.SECONDS);
585 } catch (InterruptedException ignored) {
586 LOG.warn("Thread interrupted while releasing id {} from id pool {}", idKey, parentPoolName);
588 idUtils.releaseIdLatchMap.remove(idLatchKey);
591 localPoolName = localPoolName.intern();
592 InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
593 IdPool parentIdPool = singleTxDB.syncRead(CONFIGURATION, parentIdPoolInstanceIdentifier);
594 List<IdEntries> idEntries = parentIdPool.getIdEntries();
595 List<IdEntries> newIdEntries = idEntries;
596 if (idEntries == null) {
597 throw new IdManagerException("Id Entries does not exist");
599 InstanceIdentifier<IdEntries> existingId = idUtils.getIdEntry(parentIdPoolInstanceIdentifier, idKey);
600 Optional<IdEntries> existingIdEntryObject = singleTxDB.syncReadOptional(CONFIGURATION, existingId);
601 if (!existingIdEntryObject.isPresent()) {
602 throw new IdManagerException(
603 String.format("Specified Id key %s does not exist in id pool %s", idKey, parentPoolName));
605 IdEntries existingIdEntry = existingIdEntryObject.get();
606 List<Long> idValuesList = existingIdEntry.getIdValue();
607 IdLocalPool localIdPoolCache = localPool.get(parentPoolName);
608 boolean isRemoved = newIdEntries.remove(existingIdEntry);
609 if (LOG.isDebugEnabled()) {
610 LOG.debug("The entry {} is removed {}", existingIdEntry, isRemoved);
612 updateDelayedEntriesInLocalCache(idValuesList, parentPoolName, localIdPoolCache);
613 IdHolderSyncJob poolSyncJob = new IdHolderSyncJob(localPoolName, localIdPoolCache.getReleasedIds(), broker,
615 DataStoreJobCoordinator.getInstance().enqueueJob(localPoolName, poolSyncJob, IdUtils.RETRY_COUNT);
616 scheduleCleanUpTask(localIdPoolCache, parentPoolName, parentIdPool.getBlockSize());
617 if (LOG.isDebugEnabled()) {
618 LOG.debug("Released id ({}, {}) from pool {}", idKey, idValuesList, localPoolName);
620 // Updating id entries in the parent pool. This will be used for restart scenario
621 UpdateIdEntryJob job = new UpdateIdEntryJob(parentPoolName, localPoolName, idKey, null, broker, idUtils);
622 DataStoreJobCoordinator.getInstance().enqueueJob(parentPoolName, job, IdUtils.RETRY_COUNT);
625 private void scheduleCleanUpTask(final IdLocalPool localIdPoolCache,
626 final String parentPoolName, final int blockSize) {
627 TimerTask scheduledTask = new TimerTask() {
630 CleanUpJob job = new CleanUpJob(localIdPoolCache, broker, parentPoolName, blockSize, lockManager,
632 DataStoreJobCoordinator.getInstance().enqueueJob(localIdPoolCache.getPoolName(), job,
633 IdUtils.RETRY_COUNT);
636 cleanJobTimer.schedule(scheduledTask, IdUtils.DEFAULT_DELAY_TIME * 1000);
639 private IdPool createGlobalPool(WriteTransaction tx, String poolName, long low, long high, long blockSize)
640 throws ReadFailedException {
642 InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils.getIdPoolInstance(poolName);
643 Optional<IdPool> existingIdPool = singleTxDB.syncReadOptional(CONFIGURATION, idPoolInstanceIdentifier);
644 if (!existingIdPool.isPresent()) {
645 if (LOG.isDebugEnabled()) {
646 LOG.debug("Creating new global pool {}", poolName);
648 idPool = idUtils.createGlobalPool(poolName, low, high, blockSize);
649 tx.put(CONFIGURATION, idPoolInstanceIdentifier, idPool, true);
651 idPool = existingIdPool.get();
652 if (LOG.isDebugEnabled()) {
653 LOG.debug("GlobalPool exists {}", idPool);
659 private IdLocalPool createLocalPool(WriteTransaction tx, String localPoolName, IdPool idPool)
660 throws OperationFailedException, IdManagerException {
661 localPoolName = localPoolName.intern();
662 IdLocalPool idLocalPool = new IdLocalPool(idUtils, localPoolName);
663 allocateIdBlockFromParentPool(idLocalPool, idPool, tx);
664 String parentPool = idPool.getPoolName();
665 localPool.put(parentPool, idLocalPool);
666 LocalPoolCreateJob job = new LocalPoolCreateJob(idLocalPool, broker, idPool.getPoolName(),
667 idPool.getBlockSize(), idUtils);
668 DataStoreJobCoordinator.getInstance().enqueueJob(localPoolName, job, IdUtils.RETRY_COUNT);
672 private void deletePool(String poolName) {
673 LocalPoolDeleteJob job = new LocalPoolDeleteJob(poolName, broker, idUtils);
674 DataStoreJobCoordinator.getInstance().enqueueJob(poolName, job, IdUtils.RETRY_COUNT);
677 public void poolDeleted(String parentPoolName, String poolName) {
678 IdLocalPool idLocalPool = localPool.get(parentPoolName);
679 if (idLocalPool != null) {
680 if (idLocalPool.getPoolName().equals(poolName)) {
681 localPool.remove(parentPoolName);
686 private void updateDelayedEntriesInLocalCache(List<Long> idsList, String parentPoolName,
687 IdLocalPool localPoolCache) {
688 for (long idValue : idsList) {
689 localPoolCache.getReleasedIds().addId(idValue);
691 localPool.put(parentPoolName, localPoolCache);