2 * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.genius.idmanager;
11 import static java.util.Comparator.comparing;
12 import static java.util.stream.Collectors.toCollection;
13 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
15 import com.google.common.base.Optional;
16 import com.google.common.util.concurrent.ListenableFuture;
18 import java.util.ArrayList;
19 import java.util.Collections;
20 import java.util.HashMap;
21 import java.util.LinkedList;
22 import java.util.List;
24 import java.util.Timer;
25 import java.util.TimerTask;
26 import java.util.concurrent.CompletableFuture;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.ConcurrentMap;
29 import java.util.concurrent.CopyOnWriteArrayList;
30 import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.ExecutionException;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.TimeUnit;
35 import javax.annotation.PostConstruct;
36 import javax.annotation.PreDestroy;
37 import javax.inject.Inject;
38 import javax.inject.Singleton;
40 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
41 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
42 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
43 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
44 import org.opendaylight.daexim.DataImportBootReady;
45 import org.opendaylight.genius.datastoreutils.DataStoreJobCoordinator;
46 import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
47 import org.opendaylight.genius.idmanager.ReleasedIdHolder.DelayedIdEntry;
48 import org.opendaylight.genius.idmanager.api.IdManagerMonitor;
49 import org.opendaylight.genius.idmanager.jobs.CleanUpJob;
50 import org.opendaylight.genius.idmanager.jobs.IdHolderSyncJob;
51 import org.opendaylight.genius.idmanager.jobs.LocalPoolCreateJob;
52 import org.opendaylight.genius.idmanager.jobs.LocalPoolDeleteJob;
53 import org.opendaylight.genius.idmanager.jobs.UpdateIdEntryJob;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdInput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdOutput;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdOutputBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeInput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeOutput;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeOutputBuilder;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolInput;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.DeleteIdPoolInput;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.IdManagerService;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.IdPools;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdInput;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPool;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPoolBuilder;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPoolKey;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.AvailableIdsHolder;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.AvailableIdsHolderBuilder;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ChildPools;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.IdEntries;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ReleasedIdsHolder;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ReleasedIdsHolderBuilder;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.released.ids.DelayedIdEntries;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockManagerService;
76 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
77 import org.opendaylight.yangtools.yang.common.OperationFailedException;
78 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
79 import org.opendaylight.yangtools.yang.common.RpcResult;
80 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
81 import org.ops4j.pax.cdi.api.OsgiService;
82 import org.slf4j.Logger;
83 import org.slf4j.LoggerFactory;
86 public class IdManager implements IdManagerService, IdManagerMonitor {
88 private static final Logger LOG = LoggerFactory.getLogger(IdManager.class);
89 private static final long DEFAULT_IDLE_TIME = 24 * 60 * 60;
91 private final DataBroker broker;
92 private final SingleTransactionDataBroker singleTxDB;
93 private final LockManagerService lockManager;
94 private final IdUtils idUtils;
96 private final ConcurrentMap<String, IdLocalPool> localPool;
97 private final Timer cleanJobTimer = new Timer();
100 public IdManager(DataBroker db, LockManagerService lockManager, IdUtils idUtils,
101 @OsgiService DataImportBootReady dataImportBootReady) throws ReadFailedException {
103 this.singleTxDB = new SingleTransactionDataBroker(db);
104 this.lockManager = lockManager;
105 this.idUtils = idUtils;
107 // NB: We do not "use" the DataImportBootReady, but it's presence in the OSGi
108 // Service Registry is the required "signal" that the Daexim "import on boot"
109 // has fully completed (which we want to wait for). Therefore, making this
110 // dependent on that defers the Blueprint initialization, as we'd like to,
111 // so that we do not start giving out new IDs before an import went in.
112 // Thus, please DO NOT remove the DataImportBootReady argument, even if
113 // it appears to be (is) un-used from a Java code PoV!
115 this.localPool = new ConcurrentHashMap<>();
120 public Map<String, String> getLocalPoolsDetails() {
121 Map<String, String> map = new HashMap<>();
122 localPool.entrySet().stream().forEach(entry -> map.put(entry.getKey(), entry.getValue().toString()));
127 public void start() {
128 LOG.info("{} start", getClass().getSimpleName());
132 public void close() {
133 LOG.info("{} close", getClass().getSimpleName());
136 private void populateCache() throws ReadFailedException {
137 // If IP changes during reboot, then there will be orphaned child pools.
138 InstanceIdentifier<IdPools> idPoolsInstance = idUtils.getIdPools();
139 Optional<IdPools> idPoolsOptional = singleTxDB.syncReadOptional(CONFIGURATION, idPoolsInstance);
140 if (!idPoolsOptional.isPresent()) {
143 IdPools idPools = idPoolsOptional.get();
144 List<IdPool> idPoolList = idPools.getIdPool();
147 .filter(idPool -> idPool.getParentPoolName() != null
148 && !idPool.getParentPoolName().isEmpty()
149 && idUtils.getLocalPoolName(idPool.getParentPoolName())
150 .equals(idPool.getPoolName()))
152 idPool -> updateLocalIdPoolCache(idPool,
153 idPool.getParentPoolName()));
156 public boolean updateLocalIdPoolCache(IdPool idPool, String parentPoolName) {
157 AvailableIdsHolder availableIdsHolder = idPool.getAvailableIdsHolder();
158 AvailableIdHolder availableIdHolder = new AvailableIdHolder(idUtils, availableIdsHolder.getStart(),
159 availableIdsHolder.getEnd());
160 availableIdHolder.setCur(availableIdsHolder.getCursor());
161 ReleasedIdsHolder releasedIdsHolder = idPool.getReleasedIdsHolder();
162 ReleasedIdHolder releasedIdHolder = new ReleasedIdHolder(idUtils, releasedIdsHolder.getDelayedTimeSec());
163 releasedIdHolder.setAvailableIdCount(releasedIdsHolder.getAvailableIdCount());
164 List<DelayedIdEntries> delayedEntries = releasedIdsHolder.getDelayedIdEntries();
165 List<DelayedIdEntry> delayedIdEntryInCache = new CopyOnWriteArrayList<>();
166 if (delayedEntries != null) {
167 delayedIdEntryInCache = delayedEntries
169 .map(delayedIdEntry -> new DelayedIdEntry(delayedIdEntry
170 .getId(), delayedIdEntry.getReadyTimeSec()))
171 .sorted(comparing(DelayedIdEntry::getReadyTimeSec))
172 .collect(toCollection(CopyOnWriteArrayList::new));
174 releasedIdHolder.setDelayedEntries(delayedIdEntryInCache);
176 IdLocalPool idLocalPool = new IdLocalPool(idUtils, idPool.getPoolName());
177 idLocalPool.setAvailableIds(availableIdHolder);
178 idLocalPool.setReleasedIds(releasedIdHolder);
179 localPool.put(parentPoolName, idLocalPool);
180 if (LOG.isDebugEnabled()) {
181 LOG.debug("Populating cache for {} with {}", idLocalPool.getPoolName(), idLocalPool);
187 public Future<RpcResult<Void>> createIdPool(CreateIdPoolInput input) {
188 LOG.info("createIdPool called with input {}", input);
189 String poolName = input.getPoolName();
190 long low = input.getLow();
191 long high = input.getHigh();
192 long blockSize = idUtils.computeBlockSize(low, high);
193 Future<RpcResult<Void>> futureResult;
195 idUtils.lock(lockManager, poolName);
196 WriteTransaction tx = broker.newWriteOnlyTransaction();
197 poolName = poolName.intern();
199 idPool = createGlobalPool(tx, poolName, low, high, blockSize);
200 String localPoolName = idUtils.getLocalPoolName(poolName);
201 IdLocalPool idLocalPool = localPool.get(poolName);
202 if (idLocalPool == null) {
203 createLocalPool(tx, localPoolName, idPool);
204 idUtils.updateChildPool(tx, idPool.getPoolName(), localPoolName);
206 tx.submit().checkedGet();
207 futureResult = RpcResultBuilder.<Void>success().buildFuture();
208 } catch (OperationFailedException | IdManagerException e) {
209 futureResult = buildFailedRpcResultFuture("createIdPool failed: " + input.toString(), e);
211 idUtils.unlock(lockManager, poolName);
217 public Future<RpcResult<AllocateIdOutput>> allocateId(AllocateIdInput input) {
218 LOG.debug("AllocateId called with input {}", input);
219 String idKey = input.getIdKey();
220 String poolName = input.getPoolName();
221 String localPoolName = idUtils.getLocalPoolName(poolName);
222 long newIdValue = -1;
223 AllocateIdOutputBuilder output = new AllocateIdOutputBuilder();
224 Future<RpcResult<AllocateIdOutput>> futureResult;
226 //allocateIdFromLocalPool method returns a list of IDs with one element. This element is obtained by get(0)
227 newIdValue = allocateIdFromLocalPool(poolName, localPoolName, idKey, 1).get(0);
228 output.setIdValue(newIdValue);
229 futureResult = RpcResultBuilder.<AllocateIdOutput>success().withResult(output.build()).buildFuture();
230 } catch (OperationFailedException | IdManagerException e) {
231 completeExceptionallyIfPresent(poolName, idKey, e);
232 futureResult = buildFailedRpcResultFuture("allocateId failed: " + input.toString(), e);
237 private void completeExceptionallyIfPresent(String poolName, String idKey, Exception exception) {
238 CompletableFuture<List<Long>> completableFuture =
239 idUtils.allocatedIdMap.remove(idUtils.getUniqueKey(poolName, idKey));
240 if (completableFuture != null) {
241 completableFuture.completeExceptionally(exception);
246 public Future<RpcResult<AllocateIdRangeOutput>> allocateIdRange(AllocateIdRangeInput input) {
247 if (LOG.isDebugEnabled()) {
248 LOG.debug("AllocateIdRange called with input {}", input);
250 String idKey = input.getIdKey();
251 String poolName = input.getPoolName();
252 long size = input.getSize();
253 String localPoolName = idUtils.getLocalPoolName(poolName);
254 List<Long> newIdValuesList = new ArrayList<>();
255 AllocateIdRangeOutputBuilder output = new AllocateIdRangeOutputBuilder();
256 Future<RpcResult<AllocateIdRangeOutput>> futureResult;
258 newIdValuesList = allocateIdFromLocalPool(poolName, localPoolName, idKey, size);
259 Collections.sort(newIdValuesList);
260 output.setIdValues(newIdValuesList);
261 futureResult = RpcResultBuilder.<AllocateIdRangeOutput>success().withResult(output.build()).buildFuture();
262 } catch (OperationFailedException | IdManagerException e) {
263 completeExceptionallyIfPresent(poolName, idKey, e);
264 futureResult = buildFailedRpcResultFuture("allocateIdRange failed: " + input.toString(), e);
270 public Future<RpcResult<Void>> deleteIdPool(DeleteIdPoolInput input) {
271 if (LOG.isDebugEnabled()) {
272 LOG.debug("DeleteIdPool called with input {}", input);
274 String poolName = input.getPoolName();
275 Future<RpcResult<Void>> futureResult;
277 InstanceIdentifier<IdPool> idPoolToBeDeleted = idUtils.getIdPoolInstance(poolName);
278 poolName = poolName.intern();
279 synchronized (poolName) {
280 IdPool idPool = singleTxDB.syncRead(CONFIGURATION, idPoolToBeDeleted);
281 List<ChildPools> childPoolList = idPool.getChildPools();
282 if (childPoolList != null) {
283 childPoolList.stream().forEach(childPool -> deletePool(childPool.getChildPoolName()));
285 singleTxDB.syncDelete(CONFIGURATION, idPoolToBeDeleted);
286 if (LOG.isDebugEnabled()) {
287 LOG.debug("Deleted id pool {}", poolName);
290 futureResult = RpcResultBuilder.<Void>success().buildFuture();
291 } catch (OperationFailedException e) {
292 futureResult = buildFailedRpcResultFuture("deleteIdPool failed: " + input.toString(), e);
298 public Future<RpcResult<Void>> releaseId(ReleaseIdInput input) {
299 String poolName = input.getPoolName();
300 String idKey = input.getIdKey();
301 LOG.info("Releasing ID {} from pool {}", idKey, poolName);
302 Future<RpcResult<Void>> futureResult;
303 String uniqueKey = idUtils.getUniqueKey(poolName, idKey);
305 idUtils.lock(lockManager, uniqueKey);
306 releaseIdFromLocalPool(poolName, idUtils.getLocalPoolName(poolName), idKey);
307 futureResult = RpcResultBuilder.<Void>success().buildFuture();
308 } catch (ReadFailedException | IdManagerException e) {
309 futureResult = buildFailedRpcResultFuture("releaseId failed: " + input.toString(), e);
310 idUtils.unlock(lockManager, uniqueKey);
315 private <T> ListenableFuture<RpcResult<T>> buildFailedRpcResultFuture(String msg, Exception exception) {
316 if (exception instanceof IdDoesNotExistException) {
317 // Do not log full stack trace in case ID does not exist
318 LOG.error(msg, exception);
320 LOG.error(msg, exception);
322 RpcResultBuilder<T> failedRpcResultBuilder = RpcResultBuilder.failed();
323 failedRpcResultBuilder.withError(ErrorType.APPLICATION, msg, exception);
324 if (exception instanceof OperationFailedException) {
325 failedRpcResultBuilder.withRpcErrors(((OperationFailedException) exception).getErrorList());
327 return failedRpcResultBuilder.buildFuture();
330 private List<Long> allocateIdFromLocalPool(String parentPoolName, String localPoolName,
331 String idKey, long size) throws OperationFailedException, IdManagerException {
332 LOG.debug("Allocating id from local pool {}. Parent pool {}. Idkey {}", localPoolName, parentPoolName, idKey);
333 String uniqueIdKey = idUtils.getUniqueKey(parentPoolName, idKey);
334 CompletableFuture<List<Long>> futureIdValues = new CompletableFuture<>();
335 CompletableFuture<List<Long>> existingFutureIdValue =
336 idUtils.allocatedIdMap.putIfAbsent(uniqueIdKey, futureIdValues);
337 if (existingFutureIdValue != null) {
339 return existingFutureIdValue.get();
340 } catch (InterruptedException | ExecutionException e) {
341 LOG.warn("Could not obtain id from existing futureIdValue for idKey {} and pool {}.",
342 idKey, parentPoolName);
343 throw new IdManagerException(e.getMessage(), e);
347 List<Long> newIdValuesList = checkForIdInIdEntries(parentPoolName, idKey, uniqueIdKey, futureIdValues,
348 existingFutureIdValue);
349 if (!newIdValuesList.isEmpty()) {
350 return newIdValuesList;
352 //This get will not help in concurrent reads. Hence the same read needs to be done again.
353 IdLocalPool localIdPool = getOrCreateLocalIdPool(parentPoolName, localPoolName);
354 LOG.info("Got pool {}", localIdPool);
355 long newIdValue = -1;
356 localPoolName = localPoolName.intern();
358 newIdValue = getIdFromLocalPoolCache(localIdPool, parentPoolName);
359 newIdValuesList.add(newIdValue);
361 getRangeOfIds(parentPoolName, localPoolName, size, newIdValuesList, localIdPool, newIdValue);
363 LOG.info("The newIdValues {} for the idKey {}", newIdValuesList, idKey);
364 idUtils.releaseIdLatchMap.put(uniqueIdKey, new CountDownLatch(1));
365 UpdateIdEntryJob job = new UpdateIdEntryJob(parentPoolName, localPoolName, idKey, newIdValuesList, broker,
366 idUtils, lockManager);
367 DataStoreJobCoordinator.getInstance().enqueueJob(parentPoolName, job, IdUtils.RETRY_COUNT);
368 futureIdValues.complete(newIdValuesList);
369 return newIdValuesList;
370 } catch (OperationFailedException | IdManagerException e) {
371 idUtils.unlock(lockManager, uniqueIdKey);
376 private Long getIdFromLocalPoolCache(IdLocalPool localIdPool, String parentPoolName)
377 throws OperationFailedException, IdManagerException {
379 IdHolder releasedIds = localIdPool.getReleasedIds();
380 Optional<Long> releasedId = Optional.absent();
381 releasedId = releasedIds.allocateId();
382 if (releasedId.isPresent()) {
383 IdHolderSyncJob poolSyncJob =
384 new IdHolderSyncJob(localIdPool.getPoolName(), localIdPool.getReleasedIds(), broker,
386 DataStoreJobCoordinator.getInstance().enqueueJob(localIdPool.getPoolName(),
387 poolSyncJob, IdUtils.RETRY_COUNT);
388 return releasedId.get();
390 Optional<Long> availableId = Optional.absent();
391 IdHolder availableIds = localIdPool.getAvailableIds();
392 if (availableIds != null) {
393 availableId = availableIds.allocateId();
394 if (availableId.isPresent()) {
395 IdHolderSyncJob poolSyncJob =
396 new IdHolderSyncJob(localIdPool.getPoolName(), localIdPool.getAvailableIds(),
398 DataStoreJobCoordinator.getInstance().enqueueJob(localIdPool.getPoolName(),
399 poolSyncJob, IdUtils.RETRY_COUNT);
400 return availableId.get();
403 long idCount = getIdBlockFromParentPool(parentPoolName, localIdPool);
405 if (LOG.isDebugEnabled()) {
406 LOG.debug("Unable to allocate Id block from global pool {}", parentPoolName);
408 throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentPoolName));
414 * Changes made to availableIds and releasedIds will not be persisted to the datastore.
416 private long getIdBlockFromParentPool(String parentPoolName, IdLocalPool localIdPool)
417 throws OperationFailedException, IdManagerException {
418 if (LOG.isDebugEnabled()) {
419 LOG.debug("Allocating block of id from parent pool {}", parentPoolName);
421 InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
422 parentPoolName = parentPoolName.intern();
423 idUtils.lock(lockManager, parentPoolName);
426 // Check if the childpool already got id block.
427 long availableIdCount =
428 localIdPool.getAvailableIds().getAvailableIdCount()
429 + localIdPool.getReleasedIds().getAvailableIdCount();
430 if (availableIdCount > 0) {
431 return availableIdCount;
433 WriteTransaction tx = broker.newWriteOnlyTransaction();
434 IdPool parentIdPool = singleTxDB.syncRead(CONFIGURATION, idPoolInstanceIdentifier);
435 idCount = allocateIdBlockFromParentPool(localIdPool, parentIdPool, tx);
436 tx.submit().checkedGet();
437 } catch (IdManagerException | NullPointerException e) {
438 LOG.error("Error getting id block from parent pool", e);
440 idUtils.unlock(lockManager, parentPoolName);
445 private long allocateIdBlockFromParentPool(IdLocalPool localPoolCache, IdPool parentIdPool, WriteTransaction tx)
446 throws OperationFailedException, IdManagerException {
448 ReleasedIdsHolderBuilder releasedIdsBuilderParent = idUtils.getReleaseIdsHolderBuilder(parentIdPool);
450 idCount = allocateIdBlockFromReleasedIdsHolder(localPoolCache, releasedIdsBuilderParent, parentIdPool, tx);
454 idCount = allocateIdBlockFromAvailableIdsHolder(localPoolCache, parentIdPool, tx);
458 idCount = getIdsFromOtherChildPools(releasedIdsBuilderParent, parentIdPool);
460 if (LOG.isDebugEnabled()) {
461 LOG.debug("Unable to allocate Id block from global pool");
463 throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentIdPool.getPoolName()));
468 private long getIdsFromOtherChildPools(ReleasedIdsHolderBuilder releasedIdsBuilderParent, IdPool parentIdPool)
469 throws OperationFailedException {
470 List<ChildPools> childPoolsList = parentIdPool.getChildPools();
471 // Sorting the child pools on last accessed time so that the pool that
472 // was not accessed for a long time comes first.
473 Collections.sort(childPoolsList, comparing(ChildPools::getLastAccessTime));
474 long currentTime = System.currentTimeMillis() / 1000;
475 for (ChildPools childPools : childPoolsList) {
476 if (childPools.getLastAccessTime() + DEFAULT_IDLE_TIME > currentTime) {
479 if (!childPools.getChildPoolName().equals(idUtils.getLocalPoolName(parentIdPool.getPoolName()))) {
480 InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils
481 .getIdPoolInstance(childPools.getChildPoolName());
482 IdPool otherChildPool = singleTxDB.syncRead(CONFIGURATION, idPoolInstanceIdentifier);
483 ReleasedIdsHolderBuilder releasedIds = idUtils.getReleaseIdsHolderBuilder(otherChildPool);
485 List<DelayedIdEntries> delayedIdEntriesChild = releasedIds.getDelayedIdEntries();
486 List<DelayedIdEntries> delayedIdEntriesParent = releasedIdsBuilderParent.getDelayedIdEntries();
487 if (delayedIdEntriesParent == null) {
488 delayedIdEntriesParent = new LinkedList<>();
490 delayedIdEntriesParent.addAll(delayedIdEntriesChild);
491 delayedIdEntriesChild.removeAll(delayedIdEntriesChild);
493 AvailableIdsHolderBuilder availableIds = idUtils.getAvailableIdsHolderBuilder(otherChildPool);
494 while (idUtils.isIdAvailable(availableIds)) {
495 long cursor = availableIds.getCursor() + 1;
496 delayedIdEntriesParent.add(idUtils.createDelayedIdEntry(cursor, currentTime));
497 availableIds.setCursor(cursor);
500 long totalAvailableIdCount = releasedIds.getDelayedIdEntries().size()
501 + idUtils.getAvailableIdsCount(availableIds);
502 long count = releasedIdsBuilderParent.getAvailableIdCount() + totalAvailableIdCount;
503 releasedIdsBuilderParent.setDelayedIdEntries(delayedIdEntriesParent).setAvailableIdCount(count);
504 singleTxDB.syncUpdate(CONFIGURATION, idPoolInstanceIdentifier,
505 new IdPoolBuilder().setKey(new IdPoolKey(otherChildPool.getPoolName()))
506 .setAvailableIdsHolder(availableIds.build()).setReleasedIdsHolder(releasedIds.build())
508 return totalAvailableIdCount;
514 private long allocateIdBlockFromReleasedIdsHolder(IdLocalPool localIdPool,
515 ReleasedIdsHolderBuilder releasedIdsBuilderParent, IdPool parentIdPool, WriteTransaction tx) {
516 if (releasedIdsBuilderParent.getAvailableIdCount() == 0) {
517 LOG.debug("Ids unavailable in releasedIds of parent pool {}", parentIdPool);
520 List<DelayedIdEntries> delayedIdEntriesParent = releasedIdsBuilderParent.getDelayedIdEntries();
521 int idCount = Math.min(delayedIdEntriesParent.size(), parentIdPool.getBlockSize());
522 List<DelayedIdEntries> idEntriesToBeRemoved = delayedIdEntriesParent.subList(0, idCount);
523 ReleasedIdHolder releasedIds = (ReleasedIdHolder) localIdPool.getReleasedIds();
524 List<DelayedIdEntry> delayedIdEntriesLocalCache = releasedIds.getDelayedEntries();
525 List<DelayedIdEntry> delayedIdEntriesFromParentPool = idEntriesToBeRemoved
527 .map(delayedIdEntry -> new DelayedIdEntry(delayedIdEntry
528 .getId(), delayedIdEntry.getReadyTimeSec()))
529 .sorted(comparing(DelayedIdEntry::getReadyTimeSec))
530 .collect(toCollection(CopyOnWriteArrayList::new));
531 delayedIdEntriesFromParentPool.addAll(delayedIdEntriesLocalCache);
532 releasedIds.setDelayedEntries(delayedIdEntriesFromParentPool);
533 releasedIds.setAvailableIdCount(releasedIds.getAvailableIdCount() + idCount);
534 localIdPool.setReleasedIds(releasedIds);
535 delayedIdEntriesParent.removeAll(idEntriesToBeRemoved);
536 releasedIdsBuilderParent.setDelayedIdEntries(delayedIdEntriesParent);
537 InstanceIdentifier<ReleasedIdsHolder> releasedIdsHolderInstanceIdentifier = InstanceIdentifier
538 .builder(IdPools.class).child(IdPool.class,
539 new IdPoolKey(parentIdPool.getPoolName())).child(ReleasedIdsHolder.class).build();
540 releasedIdsBuilderParent.setAvailableIdCount(releasedIdsBuilderParent.getAvailableIdCount() - idCount);
541 LOG.debug("Allocated {} ids from releasedIds of parent pool {}", idCount, parentIdPool);
542 tx.merge(CONFIGURATION, releasedIdsHolderInstanceIdentifier,
543 releasedIdsBuilderParent.build(), true);
547 private long allocateIdBlockFromAvailableIdsHolder(IdLocalPool localIdPool, IdPool parentIdPool,
548 WriteTransaction tx) {
550 AvailableIdsHolderBuilder availableIdsBuilderParent = idUtils.getAvailableIdsHolderBuilder(parentIdPool);
551 long end = availableIdsBuilderParent.getEnd();
552 long cur = availableIdsBuilderParent.getCursor();
553 if (!idUtils.isIdAvailable(availableIdsBuilderParent)) {
554 if (LOG.isDebugEnabled()) {
555 LOG.debug("Ids exhausted in parent pool {}", parentIdPool);
559 // Update availableIdsHolder of Local Pool
560 idCount = Math.min(end - cur, parentIdPool.getBlockSize());
561 AvailableIdHolder availableIds = new AvailableIdHolder(idUtils, cur + 1, cur + idCount);
562 localIdPool.setAvailableIds(availableIds);
563 // Update availableIdsHolder of Global Pool
564 InstanceIdentifier<AvailableIdsHolder> availableIdsHolderInstanceIdentifier = InstanceIdentifier
565 .builder(IdPools.class).child(IdPool.class,
566 new IdPoolKey(parentIdPool.getPoolName())).child(AvailableIdsHolder.class).build();
567 availableIdsBuilderParent.setCursor(cur + idCount);
568 if (LOG.isDebugEnabled()) {
569 LOG.debug("Allocated {} ids from availableIds of global pool {}", idCount, parentIdPool);
571 tx.merge(CONFIGURATION, availableIdsHolderInstanceIdentifier,
572 availableIdsBuilderParent.build(), true);
576 private void releaseIdFromLocalPool(String parentPoolName, String localPoolName, String idKey)
577 throws ReadFailedException, IdManagerException {
578 String idLatchKey = idUtils.getUniqueKey(parentPoolName, idKey);
579 LOG.debug("Releasing ID {} from pool {}", idKey, localPoolName);
580 CountDownLatch latch = idUtils.releaseIdLatchMap.get(idLatchKey);
583 latch.await(10, TimeUnit.SECONDS);
584 } catch (InterruptedException ignored) {
585 LOG.warn("Thread interrupted while releasing id {} from id pool {}", idKey, parentPoolName);
587 idUtils.releaseIdLatchMap.remove(idLatchKey);
590 localPoolName = localPoolName.intern();
591 InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
592 IdPool parentIdPool = singleTxDB.syncRead(CONFIGURATION, parentIdPoolInstanceIdentifier);
593 List<IdEntries> idEntries = parentIdPool.getIdEntries();
594 List<IdEntries> newIdEntries = idEntries;
595 if (idEntries == null) {
596 throw new IdDoesNotExistException(parentPoolName, idKey);
598 InstanceIdentifier<IdEntries> existingId = idUtils.getIdEntry(parentIdPoolInstanceIdentifier, idKey);
599 Optional<IdEntries> existingIdEntryObject = singleTxDB.syncReadOptional(CONFIGURATION, existingId);
600 if (!existingIdEntryObject.isPresent()) {
601 LOG.info("Specified Id key {} does not exist in id pool {}", idKey, parentPoolName);
602 idUtils.unlock(lockManager, idLatchKey);
605 IdEntries existingIdEntry = existingIdEntryObject.get();
606 List<Long> idValuesList = existingIdEntry.getIdValue();
607 IdLocalPool localIdPoolCache = localPool.get(parentPoolName);
608 boolean isRemoved = newIdEntries.remove(existingIdEntry);
609 LOG.debug("The entry {} is removed {}", existingIdEntry, isRemoved);
610 updateDelayedEntriesInLocalCache(idValuesList, parentPoolName, localIdPoolCache);
611 IdHolderSyncJob poolSyncJob = new IdHolderSyncJob(localPoolName, localIdPoolCache.getReleasedIds(), broker,
613 DataStoreJobCoordinator.getInstance().enqueueJob(localPoolName, poolSyncJob, IdUtils.RETRY_COUNT);
614 scheduleCleanUpTask(localIdPoolCache, parentPoolName, parentIdPool.getBlockSize());
615 LOG.debug("Released id ({}, {}) from pool {}", idKey, idValuesList, localPoolName);
616 // Updating id entries in the parent pool. This will be used for restart scenario
617 UpdateIdEntryJob job = new UpdateIdEntryJob(parentPoolName, localPoolName, idKey, null, broker, idUtils,
619 DataStoreJobCoordinator.getInstance().enqueueJob(parentPoolName, job, IdUtils.RETRY_COUNT);
622 private void scheduleCleanUpTask(final IdLocalPool localIdPoolCache,
623 final String parentPoolName, final int blockSize) {
624 TimerTask scheduledTask = new TimerTask() {
627 CleanUpJob job = new CleanUpJob(localIdPoolCache, broker, parentPoolName, blockSize, lockManager,
629 DataStoreJobCoordinator.getInstance().enqueueJob(localIdPoolCache.getPoolName(), job,
630 IdUtils.RETRY_COUNT);
633 cleanJobTimer.schedule(scheduledTask, IdUtils.DEFAULT_DELAY_TIME * 1000);
636 private IdPool createGlobalPool(WriteTransaction tx, String poolName, long low, long high, long blockSize)
637 throws ReadFailedException {
639 InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils.getIdPoolInstance(poolName);
640 Optional<IdPool> existingIdPool = singleTxDB.syncReadOptional(CONFIGURATION, idPoolInstanceIdentifier);
641 if (!existingIdPool.isPresent()) {
642 if (LOG.isDebugEnabled()) {
643 LOG.debug("Creating new global pool {}", poolName);
645 idPool = idUtils.createGlobalPool(poolName, low, high, blockSize);
646 tx.put(CONFIGURATION, idPoolInstanceIdentifier, idPool, true);
648 idPool = existingIdPool.get();
649 if (LOG.isDebugEnabled()) {
650 LOG.debug("GlobalPool exists {}", idPool);
656 private IdLocalPool createLocalPool(WriteTransaction tx, String localPoolName, IdPool idPool)
657 throws OperationFailedException, IdManagerException {
658 localPoolName = localPoolName.intern();
659 IdLocalPool idLocalPool = new IdLocalPool(idUtils, localPoolName);
660 allocateIdBlockFromParentPool(idLocalPool, idPool, tx);
661 String parentPool = idPool.getPoolName();
662 localPool.put(parentPool, idLocalPool);
663 LocalPoolCreateJob job = new LocalPoolCreateJob(idLocalPool, broker, idPool.getPoolName(),
664 idPool.getBlockSize(), idUtils);
665 DataStoreJobCoordinator.getInstance().enqueueJob(localPoolName, job, IdUtils.RETRY_COUNT);
669 private void deletePool(String poolName) {
670 LocalPoolDeleteJob job = new LocalPoolDeleteJob(poolName, broker, idUtils);
671 DataStoreJobCoordinator.getInstance().enqueueJob(poolName, job, IdUtils.RETRY_COUNT);
674 public void poolDeleted(String parentPoolName, String poolName) {
675 IdLocalPool idLocalPool = localPool.get(parentPoolName);
676 if (idLocalPool != null) {
677 if (idLocalPool.getPoolName().equals(poolName)) {
678 localPool.remove(parentPoolName);
683 private void updateDelayedEntriesInLocalCache(List<Long> idsList, String parentPoolName,
684 IdLocalPool localPoolCache) {
685 for (long idValue : idsList) {
686 localPoolCache.getReleasedIds().addId(idValue);
688 localPool.put(parentPoolName, localPoolCache);
691 public java.util.Optional<IdLocalPool> getIdLocalPool(String parentPoolName) {
692 return java.util.Optional.ofNullable(localPool.get(parentPoolName)).map(IdLocalPool::deepCopyOf);
695 private List<Long> checkForIdInIdEntries(String parentPoolName, String idKey, String uniqueIdKey,
696 CompletableFuture<List<Long>> futureIdValues, CompletableFuture<List<Long>> existingFutureIdValue)
697 throws IdManagerException, ReadFailedException {
698 InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
699 InstanceIdentifier<IdEntries> existingId = idUtils.getIdEntry(parentIdPoolInstanceIdentifier, idKey);
700 idUtils.lock(lockManager, uniqueIdKey);
701 List<Long> newIdValuesList = new ArrayList<>();
702 Optional<IdEntries> existingIdEntry = singleTxDB.syncReadOptional(CONFIGURATION, existingId);
703 if (existingIdEntry.isPresent()) {
704 newIdValuesList = existingIdEntry.get().getIdValue();
705 LOG.debug("Existing ids {} for the key {} ", newIdValuesList, idKey);
706 // Inform other waiting threads about this new value.
707 futureIdValues.complete(newIdValuesList);
708 // This is to avoid stale entries in the map. If this thread had populated the map,
709 // then the entry should be removed.
710 if (existingFutureIdValue == null) {
711 idUtils.allocatedIdMap.remove(uniqueIdKey);
713 idUtils.unlock(lockManager, uniqueIdKey);
714 return newIdValuesList;
716 return newIdValuesList;
719 private IdLocalPool getOrCreateLocalIdPool(String parentPoolName, String localPoolName)
720 throws IdManagerException, ReadFailedException, OperationFailedException, TransactionCommitFailedException {
721 IdLocalPool localIdPool = localPool.get(parentPoolName);
722 if (localIdPool == null) {
723 idUtils.lock(lockManager, parentPoolName);
725 // Check if a previous thread that got the cluster-wide lock
726 // first, has created the localPool
727 if (localPool.get(parentPoolName) == null) {
728 WriteTransaction tx = broker.newWriteOnlyTransaction();
729 InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier = idUtils
730 .getIdPoolInstance(parentPoolName);
731 IdPool parentIdPool = singleTxDB.syncRead(CONFIGURATION, parentIdPoolInstanceIdentifier);
732 // Return localIdPool
733 localIdPool = createLocalPool(tx, localPoolName, parentIdPool);
734 tx.submit().checkedGet();
736 localIdPool = localPool.get(parentPoolName);
739 idUtils.unlock(lockManager, parentPoolName);
745 private void getRangeOfIds(String parentPoolName, String localPoolName, long size, List<Long> newIdValuesList,
746 IdLocalPool localIdPool, long newIdValue) throws ReadFailedException, IdManagerException {
747 InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier1 = idUtils.getIdPoolInstance(parentPoolName);
748 IdPool parentIdPool = singleTxDB.syncRead(CONFIGURATION, parentIdPoolInstanceIdentifier1);
749 long totalAvailableIdCount = localIdPool.getAvailableIds().getAvailableIdCount()
750 + localIdPool.getReleasedIds().getAvailableIdCount();
751 AvailableIdsHolderBuilder availableParentIds = idUtils.getAvailableIdsHolderBuilder(parentIdPool);
752 ReleasedIdsHolderBuilder releasedParentIds = idUtils.getReleaseIdsHolderBuilder(parentIdPool);
753 totalAvailableIdCount = totalAvailableIdCount + releasedParentIds.getAvailableIdCount()
754 + idUtils.getAvailableIdsCount(availableParentIds);
755 if (totalAvailableIdCount > size) {
758 newIdValue = getIdFromLocalPoolCache(localIdPool, parentPoolName);
759 } catch (OperationFailedException e) {
760 if (LOG.isDebugEnabled()) {
761 LOG.debug("Releasing IDs to pool {}", localPoolName);
763 // Releasing the IDs added in newIdValuesList since
764 // a null list would be returned now, as the
765 // requested size of list IDs exceeds the number of
767 updateDelayedEntriesInLocalCache(newIdValuesList, parentPoolName, localIdPool);
769 newIdValuesList.add(newIdValue);
773 throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentPoolName));