2 * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.genius.idmanager;
10 import static java.util.Comparator.comparing;
11 import static java.util.stream.Collectors.toCollection;
12 import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
13 import static org.opendaylight.yangtools.yang.binding.CodeHelpers.nonnull;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import java.util.ArrayList;
19 import java.util.Collections;
20 import java.util.HashMap;
21 import java.util.LinkedList;
22 import java.util.List;
24 import java.util.Objects;
25 import java.util.Optional;
26 import java.util.Timer;
27 import java.util.TimerTask;
28 import java.util.concurrent.CompletableFuture;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ConcurrentMap;
31 import java.util.concurrent.CountDownLatch;
32 import java.util.concurrent.ExecutionException;
33 import java.util.concurrent.TimeUnit;
34 import java.util.stream.Collectors;
35 import javax.annotation.PostConstruct;
36 import javax.annotation.PreDestroy;
37 import javax.inject.Inject;
38 import javax.inject.Singleton;
39 import org.apache.aries.blueprint.annotation.service.Reference;
40 import org.eclipse.jdt.annotation.Nullable;
41 import org.opendaylight.daexim.DataImportBootReady;
42 import org.opendaylight.genius.datastoreutils.ExpectedDataObjectNotFoundException;
43 import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
44 import org.opendaylight.genius.idmanager.ReleasedIdHolder.DelayedIdEntry;
45 import org.opendaylight.genius.idmanager.api.IdManagerMonitor;
46 import org.opendaylight.genius.idmanager.jobs.CleanUpJob;
47 import org.opendaylight.genius.idmanager.jobs.IdHolderSyncJob;
48 import org.opendaylight.genius.idmanager.jobs.LocalPoolCreateJob;
49 import org.opendaylight.genius.idmanager.jobs.LocalPoolDeleteJob;
50 import org.opendaylight.genius.idmanager.jobs.UpdateIdEntryJob;
51 import org.opendaylight.genius.infra.Datastore.Configuration;
52 import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
53 import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
54 import org.opendaylight.genius.infra.TypedReadWriteTransaction;
55 import org.opendaylight.genius.infra.TypedWriteTransaction;
56 import org.opendaylight.infrautils.jobcoordinator.JobCoordinator;
57 import org.opendaylight.mdsal.binding.api.DataBroker;
58 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
59 import org.opendaylight.mdsal.common.api.ReadFailedException;
60 import org.opendaylight.serviceutils.tools.rpc.FutureRpcResults;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdInput;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdOutput;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdOutputBuilder;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeInput;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeOutput;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeOutputBuilder;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolInput;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolOutput;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolOutputBuilder;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.DeleteIdPoolInput;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.DeleteIdPoolOutput;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.IdManagerService;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.IdPools;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdInput;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdOutput;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdOutputBuilder;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPool;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPoolBuilder;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPoolKey;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.AvailableIdsHolder;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.AvailableIdsHolderBuilder;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ChildPools;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ChildPoolsKey;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.IdEntries;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.IdEntriesKey;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ReleasedIdsHolder;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ReleasedIdsHolderBuilder;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.released.ids.DelayedIdEntries;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockManagerService;
90 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
91 import org.opendaylight.yangtools.yang.common.OperationFailedException;
92 import org.opendaylight.yangtools.yang.common.RpcResult;
93 import org.opendaylight.yangtools.yang.common.Uint32;
94 import org.slf4j.Logger;
95 import org.slf4j.LoggerFactory;
98 public class IdManager implements IdManagerService, IdManagerMonitor {
100 private static final Logger LOG = LoggerFactory.getLogger(IdManager.class);
101 private static final long DEFAULT_IDLE_TIME = 24 * 60 * 60;
103 private final DataBroker broker;
104 private final ManagedNewTransactionRunner txRunner;
105 private final SingleTransactionDataBroker singleTxDB;
106 private final LockManagerService lockManager;
107 private final IdUtils idUtils;
108 private final JobCoordinator jobCoordinator;
110 private final ConcurrentMap<String, IdLocalPool> localPool;
111 private final Timer cleanJobTimer = new Timer();
114 public IdManager(DataBroker db, LockManagerService lockManager, IdUtils idUtils,
115 @Reference DataImportBootReady dataImportBootReady, @Reference JobCoordinator jobCoordinator)
116 throws ReadFailedException, InterruptedException {
118 this.txRunner = new ManagedNewTransactionRunnerImpl(db);
119 this.singleTxDB = new SingleTransactionDataBroker(db);
120 this.lockManager = lockManager;
121 this.idUtils = idUtils;
122 this.jobCoordinator = jobCoordinator;
124 // NB: We do not "use" the DataImportBootReady, but it's presence in the OSGi
125 // Service Registry is the required "signal" that the Daexim "import on boot"
126 // has fully completed (which we want to wait for). Therefore, making this
127 // dependent on that defers the Blueprint initialization, as we'd like to,
128 // so that we do not start giving out new IDs before an import went in.
129 // Thus, please DO NOT remove the DataImportBootReady argument, even if
130 // it appears to be (is) un-used from a Java code PoV!
132 this.localPool = new ConcurrentHashMap<>();
137 public Map<String, String> getLocalPoolsDetails() {
138 Map<String, String> map = new HashMap<>();
139 localPool.forEach((key, value) -> map.put(key, value.toString()));
144 public void start() {
145 LOG.info("{} start", getClass().getSimpleName());
149 public void close() {
150 cleanJobTimer.cancel();
152 LOG.info("{} close", getClass().getSimpleName());
155 private void populateCache() throws InterruptedException {
156 // If IP changes during reboot, then there will be orphaned child pools.
157 InstanceIdentifier<IdPools> idPoolsInstance = idUtils.getIdPools();
158 Optional<IdPools> idPoolsOptional;
161 idPoolsOptional = singleTxDB.syncReadOptional(LogicalDatastoreType.CONFIGURATION, idPoolsInstance);
163 } catch (ExecutionException e) {
164 LOG.error("Failed to read the id pools due to error. Retrying again...", e);
168 if (!idPoolsOptional.isPresent()) {
171 idPoolsOptional.get().nonnullIdPool().values()
173 .filter(idPool -> idPool.getParentPoolName() != null
174 && !idPool.getParentPoolName().isEmpty()
175 && idUtils.getLocalPoolName(idPool.getParentPoolName())
176 .equals(idPool.getPoolName()))
178 idPool -> updateLocalIdPoolCache(idPool,
179 idPool.getParentPoolName()));
182 public boolean updateLocalIdPoolCache(IdPool idPool, String parentPoolName) {
183 AvailableIdsHolder availableIdsHolder = idPool.getAvailableIdsHolder();
184 AvailableIdHolder availableIdHolder = new AvailableIdHolder(idUtils, availableIdsHolder.getStart().toJava(),
185 availableIdsHolder.getEnd().toJava());
186 availableIdHolder.setCur(availableIdsHolder.getCursor());
187 ReleasedIdsHolder releasedIdsHolder = idPool.getReleasedIdsHolder();
188 ReleasedIdHolder releasedIdHolder = new ReleasedIdHolder(idUtils,
189 releasedIdsHolder.getDelayedTimeSec().toJava());
190 releasedIdHolder.setAvailableIdCount(releasedIdsHolder.getAvailableIdCount().toJava());
191 List<DelayedIdEntry> delayedIdEntryInCache = releasedIdsHolder.nonnullDelayedIdEntries()
193 .map(delayedIdEntry -> new DelayedIdEntry(delayedIdEntry
194 .getId().toJava(), delayedIdEntry.getReadyTimeSec().toJava()))
195 .sorted(comparing(DelayedIdEntry::getReadyTimeSec))
196 .collect(toCollection(ArrayList::new));
198 releasedIdHolder.replaceDelayedEntries(delayedIdEntryInCache);
200 IdLocalPool idLocalPool = new IdLocalPool(idUtils, idPool.getPoolName());
201 idLocalPool.setAvailableIds(availableIdHolder);
202 idLocalPool.setReleasedIds(releasedIdHolder);
203 localPool.put(parentPoolName, idLocalPool);
204 if (LOG.isDebugEnabled()) {
205 LOG.debug("Populating cache for {} with {}", idLocalPool.getPoolName(), idLocalPool);
211 public ListenableFuture<RpcResult<CreateIdPoolOutput>> createIdPool(CreateIdPoolInput input) {
212 LOG.info("createIdPool called with input {}", input);
213 long low = input.getLow().toJava();
214 long high = input.getHigh().toJava();
215 long blockSize = idUtils.computeBlockSize(low, high);
216 return FutureRpcResults.fromListenableFuture(LOG, "createIdPool", input, () -> {
217 String poolName = input.getPoolName().intern();
219 idUtils.lock(lockManager, poolName);
220 return Futures.transform(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, confTx -> {
221 IdPool idPool = createGlobalPool(confTx, poolName, low, high, blockSize);
222 String localPoolName = idUtils.getLocalPoolName(poolName);
223 IdLocalPool idLocalPool = localPool.get(poolName);
224 if (idLocalPool == null) {
225 createLocalPool(confTx, localPoolName, idPool);
226 idUtils.updateChildPool(confTx, idPool.getPoolName(), localPoolName);
228 }), unused -> new CreateIdPoolOutputBuilder().build(), MoreExecutors.directExecutor());
230 idUtils.unlock(lockManager, poolName);
236 public ListenableFuture<RpcResult<AllocateIdOutput>> allocateId(AllocateIdInput input) {
237 String idKey = input.getIdKey();
238 String poolName = input.getPoolName();
239 return FutureRpcResults.fromBuilder(LOG, "allocateId", input, () -> {
240 String localPoolName = idUtils.getLocalPoolName(poolName);
241 // allocateIdFromLocalPool method returns a list of IDs with one element. This element is obtained by get(0)
242 long newIdValue = allocateIdFromLocalPool(poolName, localPoolName, idKey, 1).get(0).toJava();
243 return new AllocateIdOutputBuilder().setIdValue(newIdValue);
244 }).onFailure(e -> completeExceptionallyIfPresent(poolName, idKey, e)).build();
247 private void completeExceptionallyIfPresent(String poolName, String idKey, Throwable exception) {
248 CompletableFuture<List<Uint32>> completableFuture =
249 idUtils.removeAllocatedIds(idUtils.getUniqueKey(poolName, idKey));
250 if (completableFuture != null) {
251 completableFuture.completeExceptionally(exception);
256 public ListenableFuture<RpcResult<AllocateIdRangeOutput>> allocateIdRange(AllocateIdRangeInput input) {
257 String idKey = input.getIdKey();
258 String poolName = input.getPoolName();
259 long size = input.getSize().toJava();
260 String localPoolName = idUtils.getLocalPoolName(poolName);
261 AllocateIdRangeOutputBuilder output = new AllocateIdRangeOutputBuilder();
262 return FutureRpcResults.fromBuilder(LOG, "allocateIdRange", input, () -> {
263 List<Uint32> newIdValuesList = allocateIdFromLocalPool(poolName, localPoolName, idKey, size);
264 Collections.sort(newIdValuesList);
265 output.setIdValues(newIdValuesList);
267 }).onFailure(e -> completeExceptionallyIfPresent(poolName, idKey, e)).build();
271 public ListenableFuture<RpcResult<DeleteIdPoolOutput>> deleteIdPool(DeleteIdPoolInput input) {
272 return FutureRpcResults.fromListenableFuture(LOG, "deleteIdPool", input, () -> {
273 String poolName = input.getPoolName().intern();
274 InstanceIdentifier<IdPool> idPoolToBeDeleted = idUtils.getIdPoolInstance(poolName);
275 synchronized (poolName) {
276 IdPool idPool = singleTxDB.syncRead(LogicalDatastoreType.CONFIGURATION, idPoolToBeDeleted);
277 //List<ChildPools> childPoolList = idPool.getChildPools();
280 @Nullable Map<ChildPoolsKey, ChildPools> childPoolList = idPool.getChildPools();
281 if (childPoolList != null) {
282 childPoolList.forEach((childPool, childPools) -> {
283 deletePool(childPool.getChildPoolName());
286 singleTxDB.syncDelete(LogicalDatastoreType.CONFIGURATION, idPoolToBeDeleted);
288 // TODO return the Future from a TBD asyncDelete instead.. BUT check that all callers @CheckReturnValue
289 return Futures.immediateFuture((DeleteIdPoolOutput) null);
294 public ListenableFuture<RpcResult<ReleaseIdOutput>> releaseId(ReleaseIdInput input) {
295 String poolName = input.getPoolName();
296 String idKey = input.getIdKey();
297 String uniqueKey = idUtils.getUniqueKey(poolName, idKey);
298 return FutureRpcResults.fromBuilder(LOG, "releaseId", input, () -> {
299 idUtils.lock(lockManager, uniqueKey);
300 return releaseIdFromLocalPool(poolName, idUtils.getLocalPoolName(poolName), idKey);
301 }).onFailureLogLevel(FutureRpcResults.LogLevel.NONE)
303 if (e instanceof IdDoesNotExistException) {
304 // Do not log full stack trace in case ID does not exist
305 LOG.error("RPC releaseId() failed due to IdDoesNotExistException; input = {}", input);
307 // But for all other cases do:
308 LOG.error("RPC releaseId() failed; input = {}", input, e);
310 idUtils.unlock(lockManager, uniqueKey);
314 private List<Uint32> allocateIdFromLocalPool(String parentPoolName, String localPoolName,
315 String idKey, long size) throws OperationFailedException, IdManagerException, ExecutionException,
316 InterruptedException {
317 LOG.debug("Allocating id from local pool {}. Parent pool {}. Idkey {}", localPoolName, parentPoolName, idKey);
318 String uniqueIdKey = idUtils.getUniqueKey(parentPoolName, idKey);
319 CompletableFuture<List<Uint32>> futureIdValues = new CompletableFuture<>();
320 CompletableFuture<List<Uint32>> existingFutureIdValue =
321 idUtils.putAllocatedIdsIfAbsent(uniqueIdKey, futureIdValues);
322 if (existingFutureIdValue != null) {
324 return existingFutureIdValue.get();
325 } catch (InterruptedException | ExecutionException e) {
326 LOG.warn("Could not obtain id from existing futureIdValue for idKey {} and pool {}.",
327 idKey, parentPoolName);
328 throw new IdManagerException(e.getMessage(), e);
332 List<Uint32> newIdValuesList = checkForIdInIdEntries(parentPoolName, idKey, uniqueIdKey, futureIdValues,
334 if (!newIdValuesList.isEmpty()) {
335 return newIdValuesList;
337 //This get will not help in concurrent reads. Hence the same read needs to be done again.
338 IdLocalPool localIdPool = getOrCreateLocalIdPool(parentPoolName, localPoolName);
339 LOG.debug("Got pool {}", localIdPool);
340 long newIdValue = -1;
341 localPoolName = localPoolName.intern();
343 newIdValue = getIdFromLocalPoolCache(localIdPool, parentPoolName);
344 newIdValuesList.add(Uint32.valueOf(newIdValue));
346 getRangeOfIds(parentPoolName, localPoolName, size, newIdValuesList, localIdPool, newIdValue);
348 LOG.debug("The newIdValues {} for the idKey {}", newIdValuesList, idKey);
349 idUtils.putReleaseIdLatch(uniqueIdKey, new CountDownLatch(1));
350 UpdateIdEntryJob job = new UpdateIdEntryJob(parentPoolName, localPoolName, idKey, newIdValuesList, txRunner,
351 idUtils, lockManager);
352 jobCoordinator.enqueueJob(parentPoolName, job, IdUtils.RETRY_COUNT);
353 futureIdValues.complete(newIdValuesList);
354 return newIdValuesList;
355 } catch (OperationFailedException | IdManagerException e) {
356 idUtils.unlock(lockManager, uniqueIdKey);
361 private Long getIdFromLocalPoolCache(IdLocalPool localIdPool, String parentPoolName)
362 throws IdManagerException {
364 IdHolder availableIds = localIdPool.getAvailableIds();
365 if (availableIds != null) {
366 Optional<Long> availableId = availableIds.allocateId();
367 if (availableId.isPresent()) {
368 IdHolderSyncJob poolSyncJob =
369 new IdHolderSyncJob(localIdPool.getPoolName(), localIdPool.getAvailableIds(), txRunner,
371 jobCoordinator.enqueueJob(localIdPool.getPoolName(), poolSyncJob, IdUtils.RETRY_COUNT);
372 return availableId.get();
375 IdHolder releasedIds = localIdPool.getReleasedIds();
376 Optional<Long> releasedId = releasedIds.allocateId();
377 if (releasedId.isPresent()) {
378 IdHolderSyncJob poolSyncJob =
379 new IdHolderSyncJob(localIdPool.getPoolName(), localIdPool.getReleasedIds(), txRunner,
381 jobCoordinator.enqueueJob(localIdPool.getPoolName(), poolSyncJob, IdUtils.RETRY_COUNT);
382 return releasedId.get();
384 long idCount = getIdBlockFromParentPool(parentPoolName, localIdPool);
386 if (LOG.isDebugEnabled()) {
387 LOG.debug("Unable to allocate Id block from global pool {}", parentPoolName);
389 throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentPoolName));
395 * Changes made to availableIds and releasedIds will not be persisted to the datastore.
397 private long getIdBlockFromParentPool(String parentPoolName, IdLocalPool localIdPool)
398 throws IdManagerException {
399 if (LOG.isDebugEnabled()) {
400 LOG.debug("Allocating block of id from parent pool {}", parentPoolName);
402 InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
403 parentPoolName = parentPoolName.intern();
404 idUtils.lock(lockManager, parentPoolName);
406 // Check if the childpool already got id block.
407 long availableIdCount =
408 localIdPool.getAvailableIds().getAvailableIdCount()
409 + localIdPool.getReleasedIds().getAvailableIdCount();
410 if (availableIdCount > 0) {
411 return availableIdCount;
413 return txRunner.applyWithNewReadWriteTransactionAndSubmit(CONFIGURATION, confTx -> {
414 Optional<IdPool> parentIdPool = confTx.read(idPoolInstanceIdentifier).get();
415 if (parentIdPool.isPresent()) {
416 return allocateIdBlockFromParentPool(localIdPool, parentIdPool.get(), confTx);
418 throw new ExpectedDataObjectNotFoundException(LogicalDatastoreType.CONFIGURATION,
419 idPoolInstanceIdentifier);
422 } catch (InterruptedException | ExecutionException e) {
423 throw new IdManagerException("Error getting id block from parent pool", e);
425 idUtils.unlock(lockManager, parentPoolName);
429 private long allocateIdBlockFromParentPool(IdLocalPool localPoolCache, IdPool parentIdPool,
430 TypedWriteTransaction<Configuration> confTx)
431 throws OperationFailedException, IdManagerException {
433 long idCount = allocateIdBlockFromAvailableIdsHolder(localPoolCache, parentIdPool, confTx);
438 ReleasedIdsHolderBuilder releasedIdsBuilderParent = IdUtils.getReleaseIdsHolderBuilder(parentIdPool);
439 final List<DelayedIdEntries> delayedEntries = releasedIdsBuilderParent.getDelayedIdEntries();
440 if (delayedEntries != null) {
441 releasedIdsBuilderParent.setDelayedIdEntries(new ArrayList<>(delayedEntries));
445 idCount = allocateIdBlockFromReleasedIdsHolder(localPoolCache, releasedIdsBuilderParent, parentIdPool,
450 idCount = getIdsFromOtherChildPools(releasedIdsBuilderParent, parentIdPool);
452 if (LOG.isDebugEnabled()) {
453 LOG.debug("Unable to allocate Id block from global pool");
455 throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentIdPool.getPoolName()));
458 idCount = allocateIdBlockFromAvailableIdsHolder(localPoolCache, parentIdPool, confTx);
465 private long getIdsFromOtherChildPools(ReleasedIdsHolderBuilder releasedIdsBuilderParent, IdPool parentIdPool)
466 throws OperationFailedException {
467 @Nullable Map<ChildPoolsKey, ChildPools> childPoolsMap = parentIdPool.nonnullChildPools();
468 // Sorting the child pools on last accessed time so that the pool that
469 // was not accessed for a long time comes first.
471 List<ChildPools> list = childPoolsMap.values().stream().collect(Collectors.toList());
472 list.sort(comparing(ChildPools::getLastAccessTime));
474 long currentTime = System.currentTimeMillis() / 1000;
475 for (ChildPools childPools : childPoolsMap.values()) {
476 if (childPools.getLastAccessTime().toJava() + DEFAULT_IDLE_TIME > currentTime) {
479 if (!Objects.equals(childPools.getChildPoolName(), idUtils.getLocalPoolName(parentIdPool.getPoolName()))) {
480 InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils
481 .getIdPoolInstance(childPools.getChildPoolName());
482 IdPool otherChildPool =
483 singleTxDB.syncRead(LogicalDatastoreType.CONFIGURATION, idPoolInstanceIdentifier);
484 ReleasedIdsHolderBuilder releasedIds = IdUtils.getReleaseIdsHolderBuilder(otherChildPool);
486 List<DelayedIdEntries> delayedIdEntriesChild = releasedIds.getDelayedIdEntries();
487 List<DelayedIdEntries> delayedIdEntriesParent = releasedIdsBuilderParent.getDelayedIdEntries();
488 if (delayedIdEntriesParent == null) {
489 delayedIdEntriesParent = new LinkedList<>();
491 delayedIdEntriesParent.addAll(delayedIdEntriesChild);
492 delayedIdEntriesChild.clear();
494 AvailableIdsHolderBuilder availableIds = idUtils.getAvailableIdsHolderBuilder(otherChildPool);
495 while (idUtils.isIdAvailable(availableIds)) {
496 long cursor = availableIds.getCursor() + 1;
497 delayedIdEntriesParent.add(idUtils.createDelayedIdEntry(cursor, currentTime));
498 availableIds.setCursor(cursor);
501 long totalAvailableIdCount = releasedIds.getDelayedIdEntries().size()
502 + idUtils.getAvailableIdsCount(availableIds);
503 long count = releasedIdsBuilderParent.getAvailableIdCount().toJava() + totalAvailableIdCount;
504 releasedIdsBuilderParent.setDelayedIdEntries(delayedIdEntriesParent).setAvailableIdCount(count);
505 singleTxDB.syncUpdate(LogicalDatastoreType.CONFIGURATION, idPoolInstanceIdentifier,
506 new IdPoolBuilder().withKey(new IdPoolKey(otherChildPool.getPoolName()))
507 .setAvailableIdsHolder(availableIds.build()).setReleasedIdsHolder(releasedIds.build())
509 return totalAvailableIdCount;
515 private long allocateIdBlockFromReleasedIdsHolder(IdLocalPool localIdPool,
516 ReleasedIdsHolderBuilder releasedIdsBuilderParent, IdPool parentIdPool,
517 TypedWriteTransaction<Configuration> confTx) {
518 if (releasedIdsBuilderParent.getAvailableIdCount().toJava() == 0) {
519 LOG.debug("Ids unavailable in releasedIds of parent pool {}", parentIdPool);
522 List<DelayedIdEntries> delayedIdEntriesParent = releasedIdsBuilderParent.getDelayedIdEntries();
523 int idCount = Math.min(delayedIdEntriesParent.size(), parentIdPool.getBlockSize().toJava());
524 List<DelayedIdEntries> idEntriesToBeRemoved = delayedIdEntriesParent.subList(0, idCount);
525 ReleasedIdHolder releasedIds = (ReleasedIdHolder) localIdPool.getReleasedIds();
526 List<DelayedIdEntry> delayedIdEntriesLocalCache = releasedIds.getDelayedEntries();
527 List<DelayedIdEntry> delayedIdEntriesFromParentPool = idEntriesToBeRemoved
529 .map(delayedIdEntry -> new DelayedIdEntry(delayedIdEntry
530 .getId().toJava(), delayedIdEntry.getReadyTimeSec().toJava()))
531 .sorted(comparing(DelayedIdEntry::getReadyTimeSec))
532 .collect(toCollection(ArrayList::new));
533 delayedIdEntriesFromParentPool.addAll(delayedIdEntriesLocalCache);
534 releasedIds.replaceDelayedEntries(delayedIdEntriesFromParentPool);
535 releasedIds.setAvailableIdCount(releasedIds.getAvailableIdCount() + idCount);
536 localIdPool.setReleasedIds(releasedIds);
537 delayedIdEntriesParent.removeAll(idEntriesToBeRemoved);
538 releasedIdsBuilderParent.setDelayedIdEntries(delayedIdEntriesParent);
539 InstanceIdentifier<ReleasedIdsHolder> releasedIdsHolderInstanceIdentifier = InstanceIdentifier
540 .builder(IdPools.class).child(IdPool.class,
541 new IdPoolKey(parentIdPool.getPoolName())).child(ReleasedIdsHolder.class).build();
542 releasedIdsBuilderParent.setAvailableIdCount(releasedIdsBuilderParent.getAvailableIdCount().toJava() - idCount);
543 LOG.debug("Allocated {} ids from releasedIds of parent pool {}", idCount, parentIdPool);
544 confTx.mergeParentStructureMerge(releasedIdsHolderInstanceIdentifier, releasedIdsBuilderParent.build());
548 private long allocateIdBlockFromAvailableIdsHolder(IdLocalPool localIdPool, IdPool parentIdPool,
549 TypedWriteTransaction<Configuration> confTx) {
551 AvailableIdsHolderBuilder availableIdsBuilderParent = idUtils.getAvailableIdsHolderBuilder(parentIdPool);
552 long end = availableIdsBuilderParent.getEnd().toJava();
553 long cur = availableIdsBuilderParent.getCursor();
554 if (!idUtils.isIdAvailable(availableIdsBuilderParent)) {
555 if (LOG.isDebugEnabled()) {
556 LOG.debug("Ids exhausted in parent pool {}", parentIdPool);
560 // Update availableIdsHolder of Local Pool
561 idCount = Math.min(end - cur, parentIdPool.getBlockSize().toJava());
562 AvailableIdHolder availableIds = new AvailableIdHolder(idUtils, cur + 1, cur + idCount);
563 localIdPool.setAvailableIds(availableIds);
564 // Update availableIdsHolder of Global Pool
565 InstanceIdentifier<AvailableIdsHolder> availableIdsHolderInstanceIdentifier = InstanceIdentifier
566 .builder(IdPools.class).child(IdPool.class,
567 new IdPoolKey(parentIdPool.getPoolName())).child(AvailableIdsHolder.class).build();
568 availableIdsBuilderParent.setCursor(cur + idCount);
569 if (LOG.isDebugEnabled()) {
570 LOG.debug("Allocated {} ids from availableIds of global pool {}", idCount, parentIdPool);
572 confTx.mergeParentStructureMerge(availableIdsHolderInstanceIdentifier, availableIdsBuilderParent.build());
576 private ReleaseIdOutputBuilder releaseIdFromLocalPool(String parentPoolName, String localPoolName, String idKey)
577 throws IdManagerException, ReadFailedException, ExecutionException, InterruptedException {
578 String idLatchKey = idUtils.getUniqueKey(parentPoolName, idKey);
579 LOG.debug("Releasing ID {} from pool {}", idKey, localPoolName);
580 CountDownLatch latch = idUtils.getReleaseIdLatch(idLatchKey);
583 if (!latch.await(10, TimeUnit.SECONDS)) {
584 LOG.warn("Timed out while releasing id {} from id pool {}", idKey, parentPoolName);
586 } catch (InterruptedException ignored) {
587 LOG.warn("Thread interrupted while releasing id {} from id pool {}", idKey, parentPoolName);
589 idUtils.removeReleaseIdLatch(idLatchKey);
592 localPoolName = localPoolName.intern();
593 InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
594 IdPool parentIdPool = singleTxDB.syncRead(LogicalDatastoreType.CONFIGURATION, parentIdPoolInstanceIdentifier);
595 @Nullable Map<IdEntriesKey, IdEntries> idEntries = parentIdPool.getIdEntries();
596 if (idEntries == null) {
597 throw new IdDoesNotExistException(parentPoolName, idKey);
599 InstanceIdentifier<IdEntries> existingId = idUtils.getIdEntry(parentIdPoolInstanceIdentifier, idKey);
600 Optional<IdEntries> existingIdEntryObject =
601 singleTxDB.syncReadOptional(LogicalDatastoreType.CONFIGURATION, existingId);
602 if (!existingIdEntryObject.isPresent()) {
603 LOG.info("Specified Id key {} does not exist in id pool {}", idKey, parentPoolName);
604 idUtils.unlock(lockManager, idLatchKey);
605 throw new IdManagerException(String.format("Specified Id key %s does not exist in id pool %s",
606 idKey, parentPoolName));
608 IdEntries existingIdEntry = existingIdEntryObject.get();
609 List<Uint32> idValuesList = nonnull(existingIdEntry.getIdValue());
610 IdLocalPool localIdPoolCache = localPool.get(parentPoolName);
611 boolean isRemoved = idEntries.values().contains(existingIdEntry);
612 LOG.debug("The entry {} is removed {}", existingIdEntry, isRemoved);
613 updateDelayedEntriesInLocalCache(idValuesList, parentPoolName, localIdPoolCache);
614 IdHolderSyncJob poolSyncJob = new IdHolderSyncJob(localPoolName, localIdPoolCache.getReleasedIds(), txRunner,
616 jobCoordinator.enqueueJob(localPoolName, poolSyncJob, IdUtils.RETRY_COUNT);
617 scheduleCleanUpTask(localIdPoolCache, parentPoolName, parentIdPool.getBlockSize().toJava());
618 LOG.debug("Released id ({}, {}) from pool {}", idKey, idValuesList, localPoolName);
619 // Updating id entries in the parent pool. This will be used for restart scenario
620 UpdateIdEntryJob job = new UpdateIdEntryJob(parentPoolName, localPoolName, idKey, null, txRunner, idUtils,
622 jobCoordinator.enqueueJob(parentPoolName, job, IdUtils.RETRY_COUNT);
623 return new ReleaseIdOutputBuilder().setIdValues(idValuesList);
626 private void scheduleCleanUpTask(final IdLocalPool localIdPoolCache,
627 final String parentPoolName, final int blockSize) {
628 TimerTask scheduledTask = new TimerTask() {
632 new CleanUpJob(localIdPoolCache, txRunner, broker, parentPoolName, blockSize, lockManager,
633 idUtils, jobCoordinator);
634 jobCoordinator.enqueueJob(localIdPoolCache.getPoolName(), job, IdUtils.RETRY_COUNT);
637 cleanJobTimer.schedule(scheduledTask, IdUtils.DEFAULT_DELAY_TIME * 1000);
640 private IdPool createGlobalPool(TypedReadWriteTransaction<Configuration> confTx, String poolName, long low,
641 long high, long blockSize) throws IdManagerException {
643 InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils.getIdPoolInstance(poolName);
645 Optional<IdPool> existingIdPool = confTx.read(idPoolInstanceIdentifier).get();
646 if (!existingIdPool.isPresent()) {
647 if (LOG.isDebugEnabled()) {
648 LOG.debug("Creating new global pool {}", poolName);
650 idPool = idUtils.createGlobalPool(poolName, low, high, blockSize);
651 confTx.mergeParentStructurePut(idPoolInstanceIdentifier, idPool);
653 idPool = existingIdPool.get();
654 if (LOG.isDebugEnabled()) {
655 LOG.debug("GlobalPool exists {}", idPool);
659 } catch (ExecutionException | InterruptedException e) {
660 throw new IdManagerException("Error retrieving the existing id pool for " + poolName, e);
664 private IdLocalPool createLocalPool(TypedWriteTransaction<Configuration> confTx, String localPoolName,
666 throws OperationFailedException, IdManagerException {
667 localPoolName = localPoolName.intern();
668 IdLocalPool idLocalPool = new IdLocalPool(idUtils, localPoolName);
669 allocateIdBlockFromParentPool(idLocalPool, idPool, confTx);
670 String parentPool = idPool.getPoolName();
671 localPool.put(parentPool, idLocalPool);
672 LocalPoolCreateJob job = new LocalPoolCreateJob(idLocalPool, txRunner, idPool.getPoolName(),
673 idPool.getBlockSize().toJava(), idUtils);
674 jobCoordinator.enqueueJob(localPoolName, job, IdUtils.RETRY_COUNT);
678 private void deletePool(String poolName) {
679 LocalPoolDeleteJob job = new LocalPoolDeleteJob(poolName, txRunner, idUtils);
680 jobCoordinator.enqueueJob(poolName, job, IdUtils.RETRY_COUNT);
683 public void poolDeleted(String parentPoolName, String poolName) {
684 IdLocalPool idLocalPool = localPool.get(parentPoolName);
685 if (idLocalPool != null) {
686 if (idLocalPool.getPoolName().equals(poolName)) {
687 localPool.remove(parentPoolName);
692 private void updateDelayedEntriesInLocalCache(List<Uint32> idsList, String parentPoolName,
693 IdLocalPool localPoolCache) {
694 for (Uint32 idValue : idsList) {
695 localPoolCache.getReleasedIds().addId(idValue.toJava());
697 localPool.put(parentPoolName, localPoolCache);
700 private List<Uint32> checkForIdInIdEntries(String parentPoolName, String idKey, String uniqueIdKey,
701 CompletableFuture<List<Uint32>> futureIdValues, boolean hasExistingFutureIdValues)
702 throws IdManagerException, InterruptedException , ExecutionException {
703 InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
704 InstanceIdentifier<IdEntries> existingId = idUtils.getIdEntry(parentIdPoolInstanceIdentifier, idKey);
705 idUtils.lock(lockManager, uniqueIdKey);
706 List<Uint32> newIdValuesList = new ArrayList<>();
707 Optional<IdEntries> existingIdEntry =
708 singleTxDB.syncReadOptional(LogicalDatastoreType.CONFIGURATION, existingId);
709 if (existingIdEntry.isPresent()) {
710 newIdValuesList = existingIdEntry.get().getIdValue();
711 LOG.debug("Existing ids {} for the key {} ", newIdValuesList, idKey);
712 // Inform other waiting threads about this new value.
713 futureIdValues.complete(newIdValuesList);
714 // This is to avoid stale entries in the map. If this thread had populated the map,
715 // then the entry should be removed.
716 if (!hasExistingFutureIdValues) {
717 idUtils.removeAllocatedIds(uniqueIdKey);
719 idUtils.unlock(lockManager, uniqueIdKey);
720 return newIdValuesList;
722 return newIdValuesList;
725 private IdLocalPool getOrCreateLocalIdPool(String parentPoolName, String localPoolName)
726 throws IdManagerException {
727 IdLocalPool localIdPool = localPool.get(parentPoolName);
728 if (localIdPool == null) {
729 idUtils.lock(lockManager, parentPoolName);
731 // Check if a previous thread that got the cluster-wide lock
732 // first, has created the localPool
734 InstanceIdentifier<IdPool> childIdPoolInstanceIdentifier = idUtils
735 .getIdPoolInstance(localPoolName);
736 Optional<IdPool> childIdPoolOpt = singleTxDB.syncReadOptional(LogicalDatastoreType.CONFIGURATION,
737 childIdPoolInstanceIdentifier);
738 if (childIdPoolOpt.isPresent()) {
739 updateLocalIdPoolCache(childIdPoolOpt.get(), parentPoolName);
742 catch (ExecutionException | InterruptedException ex) {
743 LOG.debug("Failed to read id pool {} due to {}", localPoolName, ex.getMessage());
745 if (localPool.get(parentPoolName) == null) {
747 return txRunner.applyWithNewReadWriteTransactionAndSubmit(CONFIGURATION, confTx -> {
748 InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier = idUtils
749 .getIdPoolInstance(parentPoolName);
750 Optional<IdPool> parentIdPool = confTx.read(parentIdPoolInstanceIdentifier).get();
751 if (parentIdPool.isPresent()) {
752 // Return localIdPool
753 return createLocalPool(confTx, localPoolName, parentIdPool.get());
755 throw new ExpectedDataObjectNotFoundException(LogicalDatastoreType.CONFIGURATION,
756 parentIdPoolInstanceIdentifier);
759 } catch (InterruptedException | ExecutionException e) {
760 throw new IdManagerException("Error creating a local id pool", e);
763 localIdPool = localPool.get(parentPoolName);
766 idUtils.unlock(lockManager, parentPoolName);
772 private void getRangeOfIds(String parentPoolName, String localPoolName, long size, List<Uint32> newIdValuesList,
773 IdLocalPool localIdPool, long newIdValue) throws ReadFailedException, IdManagerException {
774 InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier1 = idUtils.getIdPoolInstance(parentPoolName);
775 IdPool parentIdPool = singleTxDB.syncRead(LogicalDatastoreType.CONFIGURATION, parentIdPoolInstanceIdentifier1);
776 long totalAvailableIdCount = localIdPool.getAvailableIds().getAvailableIdCount()
777 + localIdPool.getReleasedIds().getAvailableIdCount();
778 AvailableIdsHolderBuilder availableParentIds = idUtils.getAvailableIdsHolderBuilder(parentIdPool);
779 ReleasedIdsHolderBuilder releasedParentIds = IdUtils.getReleaseIdsHolderBuilder(parentIdPool);
780 totalAvailableIdCount = totalAvailableIdCount + releasedParentIds.getAvailableIdCount().toJava()
781 + idUtils.getAvailableIdsCount(availableParentIds);
782 if (totalAvailableIdCount > size) {
785 newIdValue = getIdFromLocalPoolCache(localIdPool, parentPoolName);
786 } catch (IdManagerException e) {
787 if (LOG.isDebugEnabled()) {
788 LOG.debug("Releasing IDs to pool {}", localPoolName);
790 // Releasing the IDs added in newIdValuesList since
791 // a null list would be returned now, as the
792 // requested size of list IDs exceeds the number of
794 updateDelayedEntriesInLocalCache(newIdValuesList, parentPoolName, localIdPool);
796 newIdValuesList.add(Uint32.valueOf(newIdValue));
800 throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentPoolName));