2 * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.genius.idmanager;
11 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
13 import com.google.common.base.Optional;
14 import com.google.common.util.concurrent.ListenableFuture;
16 import java.util.ArrayList;
17 import java.util.Collections;
18 import java.util.HashMap;
19 import java.util.LinkedList;
20 import java.util.List;
22 import java.util.Timer;
23 import java.util.TimerTask;
24 import java.util.concurrent.CompletableFuture;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.TimeUnit;
32 import java.util.stream.Collectors;
34 import javax.annotation.PostConstruct;
35 import javax.annotation.PreDestroy;
36 import javax.inject.Inject;
37 import javax.inject.Singleton;
39 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
40 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
41 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
42 import org.opendaylight.genius.datastoreutils.DataStoreJobCoordinator;
43 import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
44 import org.opendaylight.genius.idmanager.ReleasedIdHolder.DelayedIdEntry;
45 import org.opendaylight.genius.idmanager.api.IdManagerMonitor;
46 import org.opendaylight.genius.idmanager.jobs.CleanUpJob;
47 import org.opendaylight.genius.idmanager.jobs.IdHolderSyncJob;
48 import org.opendaylight.genius.idmanager.jobs.LocalPoolCreateJob;
49 import org.opendaylight.genius.idmanager.jobs.LocalPoolDeleteJob;
50 import org.opendaylight.genius.idmanager.jobs.UpdateIdEntryJob;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdInput;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdOutput;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdOutputBuilder;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeInput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeOutput;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeOutputBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolInput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.DeleteIdPoolInput;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.IdManagerService;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.IdPools;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdInput;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPool;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPoolBuilder;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPoolKey;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.AvailableIdsHolder;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.AvailableIdsHolderBuilder;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ChildPools;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.IdEntries;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ReleasedIdsHolder;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ReleasedIdsHolderBuilder;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.released.ids.DelayedIdEntries;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockManagerService;
73 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
74 import org.opendaylight.yangtools.yang.common.OperationFailedException;
75 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
76 import org.opendaylight.yangtools.yang.common.RpcResult;
77 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
78 import org.slf4j.Logger;
79 import org.slf4j.LoggerFactory;
82 public class IdManager implements IdManagerService, IdManagerMonitor {
84 private static final Logger LOG = LoggerFactory.getLogger(IdManager.class);
85 private static final long DEFAULT_IDLE_TIME = 24 * 60 * 60;
87 private final DataBroker broker;
88 private final SingleTransactionDataBroker singleTxDB;
89 private final LockManagerService lockManager;
90 private final IdUtils idUtils;
92 private final ConcurrentMap<String, IdLocalPool> localPool;
93 private final Timer cleanJobTimer = new Timer();
96 public IdManager(DataBroker db, LockManagerService lockManager, IdUtils idUtils) throws ReadFailedException {
98 this.singleTxDB = new SingleTransactionDataBroker(db);
99 this.lockManager = lockManager;
100 this.idUtils = idUtils;
101 this.localPool = new ConcurrentHashMap<>();
106 public Map<String, String> getLocalPoolsDetails() {
107 Map<String, String> map = new HashMap<>();
108 localPool.entrySet().stream().forEach(entry -> map.put(entry.getKey(), entry.getValue().toString()));
113 public void start() {
114 LOG.info("{} start", getClass().getSimpleName());
118 public void close() {
119 LOG.info("{} close", getClass().getSimpleName());
122 private void populateCache() throws ReadFailedException {
123 // If IP changes during reboot, then there will be orphaned child pools.
124 InstanceIdentifier<IdPools> idPoolsInstance = idUtils.getIdPools();
125 Optional<IdPools> idPoolsOptional = singleTxDB.syncReadOptional(CONFIGURATION, idPoolsInstance);
126 if (!idPoolsOptional.isPresent()) {
129 IdPools idPools = idPoolsOptional.get();
130 List<IdPool> idPoolList = idPools.getIdPool();
133 .filter(idPool -> idPool.getParentPoolName() != null
134 && !idPool.getParentPoolName().isEmpty()
135 && idUtils.getLocalPoolName(idPool.getParentPoolName())
136 .equals(idPool.getPoolName()))
138 idPool -> updateLocalIdPoolCache(idPool,
139 idPool.getParentPoolName()));
142 public boolean updateLocalIdPoolCache(IdPool idPool, String parentPoolName) {
143 AvailableIdsHolder availableIdsHolder = idPool.getAvailableIdsHolder();
144 AvailableIdHolder availableIdHolder = new AvailableIdHolder(idUtils, availableIdsHolder.getStart(),
145 availableIdsHolder.getEnd());
146 availableIdHolder.setCur(availableIdsHolder.getCursor());
147 ReleasedIdsHolder releasedIdsHolder = idPool.getReleasedIdsHolder();
148 ReleasedIdHolder releasedIdHolder = new ReleasedIdHolder(idUtils, releasedIdsHolder.getDelayedTimeSec());
149 releasedIdHolder.setAvailableIdCount(releasedIdsHolder.getAvailableIdCount());
150 List<DelayedIdEntries> delayedEntries = releasedIdsHolder.getDelayedIdEntries();
151 List<DelayedIdEntry> delayedIdEntryInCache = new CopyOnWriteArrayList<>();
152 if (delayedEntries != null) {
153 delayedIdEntryInCache = delayedEntries
155 .map(delayedIdEntry -> new DelayedIdEntry(delayedIdEntry
156 .getId(), delayedIdEntry.getReadyTimeSec()))
157 .sorted((idEntry1, idEntry2) -> Long.compare(idEntry1.getReadyTimeSec(),
158 idEntry2.getReadyTimeSec())).collect(Collectors.toList());
160 releasedIdHolder.setDelayedEntries(delayedIdEntryInCache);
162 IdLocalPool idLocalPool = new IdLocalPool(idUtils, idPool.getPoolName());
163 idLocalPool.setAvailableIds(availableIdHolder);
164 idLocalPool.setReleasedIds(releasedIdHolder);
165 localPool.put(parentPoolName, idLocalPool);
166 if (LOG.isDebugEnabled()) {
167 LOG.debug("Populating cache for {} with {}", idLocalPool.getPoolName(), idLocalPool);
173 public Future<RpcResult<Void>> createIdPool(CreateIdPoolInput input) {
174 if (LOG.isDebugEnabled()) {
175 LOG.debug("createIdPool called with input {}", input);
177 String poolName = input.getPoolName();
178 long low = input.getLow();
179 long high = input.getHigh();
180 long blockSize = idUtils.computeBlockSize(low, high);
181 Future<RpcResult<Void>> futureResult;
183 idUtils.lock(lockManager, poolName);
184 WriteTransaction tx = broker.newWriteOnlyTransaction();
185 poolName = poolName.intern();
187 idPool = createGlobalPool(tx, poolName, low, high, blockSize);
188 String localPoolName = idUtils.getLocalPoolName(poolName);
189 IdLocalPool idLocalPool = localPool.get(poolName);
190 if (idLocalPool == null) {
191 createLocalPool(tx, localPoolName, idPool);
192 idUtils.updateChildPool(tx, idPool.getPoolName(), localPoolName);
194 tx.submit().checkedGet();
195 futureResult = RpcResultBuilder.<Void>success().buildFuture();
196 } catch (OperationFailedException | IdManagerException e) {
197 futureResult = buildFailedRpcResultFuture("createIdPool failed: " + input.toString(), e);
199 idUtils.unlock(lockManager, poolName);
205 public Future<RpcResult<AllocateIdOutput>> allocateId(AllocateIdInput input) {
206 if (LOG.isDebugEnabled()) {
207 LOG.debug("AllocateId called with input {}", input);
209 String idKey = input.getIdKey();
210 String poolName = input.getPoolName();
211 String localPoolName = idUtils.getLocalPoolName(poolName);
212 long newIdValue = -1;
213 AllocateIdOutputBuilder output = new AllocateIdOutputBuilder();
214 Future<RpcResult<AllocateIdOutput>> futureResult;
215 String uniqueKey = idUtils.getUniqueKey(poolName, idKey);
217 idUtils.lock(lockManager, uniqueKey);
218 //allocateIdFromLocalPool method returns a list of IDs with one element. This element is obtained by get(0)
219 newIdValue = allocateIdFromLocalPool(poolName, localPoolName, idKey, 1).get(0);
220 output.setIdValue(newIdValue);
221 futureResult = RpcResultBuilder.<AllocateIdOutput>success().withResult(output.build()).buildFuture();
222 } catch (OperationFailedException | IdManagerException e) {
223 java.util.Optional.ofNullable(
224 idUtils.allocatedIdMap.remove(idUtils.getUniqueKey(poolName, idKey)))
225 .ifPresent(futureId -> futureId.completeExceptionally(e));
226 futureResult = buildFailedRpcResultFuture("allocateId failed: " + input.toString(), e);
227 idUtils.unlock(lockManager, uniqueKey);
233 public Future<RpcResult<AllocateIdRangeOutput>> allocateIdRange(AllocateIdRangeInput input) {
234 if (LOG.isDebugEnabled()) {
235 LOG.debug("AllocateIdRange called with input {}", input);
237 String idKey = input.getIdKey();
238 String poolName = input.getPoolName();
239 long size = input.getSize();
240 String localPoolName = idUtils.getLocalPoolName(poolName);
241 List<Long> newIdValuesList = new ArrayList<>();
242 AllocateIdRangeOutputBuilder output = new AllocateIdRangeOutputBuilder();
243 Future<RpcResult<AllocateIdRangeOutput>> futureResult;
244 String uniqueKey = idUtils.getUniqueKey(poolName, idKey);
246 idUtils.lock(lockManager, uniqueKey);
247 newIdValuesList = allocateIdFromLocalPool(poolName, localPoolName, idKey, size);
248 Collections.sort(newIdValuesList);
249 output.setIdValues(newIdValuesList);
250 futureResult = RpcResultBuilder.<AllocateIdRangeOutput>success().withResult(output.build()).buildFuture();
251 } catch (OperationFailedException | IdManagerException e) {
252 java.util.Optional.ofNullable(
253 idUtils.allocatedIdMap.remove(idUtils.getUniqueKey(poolName, idKey)))
254 .ifPresent(futureId -> futureId.completeExceptionally(e));
255 futureResult = buildFailedRpcResultFuture("allocateIdRange failed: " + input.toString(), e);
256 idUtils.unlock(lockManager, uniqueKey);
262 public Future<RpcResult<Void>> deleteIdPool(DeleteIdPoolInput input) {
263 if (LOG.isDebugEnabled()) {
264 LOG.debug("DeleteIdPool called with input {}", input);
266 String poolName = input.getPoolName();
267 Future<RpcResult<Void>> futureResult;
269 InstanceIdentifier<IdPool> idPoolToBeDeleted = idUtils.getIdPoolInstance(poolName);
270 poolName = poolName.intern();
271 synchronized (poolName) {
272 IdPool idPool = singleTxDB.syncRead(CONFIGURATION, idPoolToBeDeleted);
273 List<ChildPools> childPoolList = idPool.getChildPools();
274 if (childPoolList != null) {
275 childPoolList.parallelStream().forEach(childPool -> deletePool(childPool.getChildPoolName()));
277 singleTxDB.syncDelete(CONFIGURATION, idPoolToBeDeleted);
278 if (LOG.isDebugEnabled()) {
279 LOG.debug("Deleted id pool {}", poolName);
282 futureResult = RpcResultBuilder.<Void>success().buildFuture();
283 } catch (OperationFailedException e) {
284 futureResult = buildFailedRpcResultFuture("deleteIdPool failed: " + input.toString(), e);
290 public Future<RpcResult<Void>> releaseId(ReleaseIdInput input) {
291 String poolName = input.getPoolName();
292 String idKey = input.getIdKey();
293 Future<RpcResult<Void>> futureResult;
294 String uniqueKey = idUtils.getUniqueKey(poolName, idKey);
296 idUtils.lock(lockManager, uniqueKey);
297 releaseIdFromLocalPool(poolName, idUtils.getLocalPoolName(poolName), idKey);
298 futureResult = RpcResultBuilder.<Void>success().buildFuture();
299 } catch (ReadFailedException | IdManagerException e) {
300 futureResult = buildFailedRpcResultFuture("releaseId failed: " + input.toString(), e);
301 idUtils.unlock(lockManager, uniqueKey);
306 private <T> ListenableFuture<RpcResult<T>> buildFailedRpcResultFuture(String msg, Exception exception) {
307 LOG.error(msg, exception);
308 RpcResultBuilder<T> failedRpcResultBuilder = RpcResultBuilder.failed();
309 failedRpcResultBuilder.withError(ErrorType.APPLICATION, msg, exception);
310 if (exception instanceof OperationFailedException) {
311 failedRpcResultBuilder.withRpcErrors(((OperationFailedException) exception).getErrorList());
313 return failedRpcResultBuilder.buildFuture();
316 private List<Long> allocateIdFromLocalPool(String parentPoolName, String localPoolName,
317 String idKey, long size) throws OperationFailedException, IdManagerException {
318 if (LOG.isDebugEnabled()) {
319 LOG.debug("Allocating id from local pool {}. Parent pool {}. Idkey {}", localPoolName, parentPoolName,
322 List<Long> newIdValuesList = new ArrayList<>();
323 String uniqueIdKey = idUtils.getUniqueKey(parentPoolName, idKey);
324 CompletableFuture<List<Long>> futureIdValues = new CompletableFuture<>();
325 CompletableFuture<List<Long>> existingFutureIdValue =
326 idUtils.allocatedIdMap.putIfAbsent(uniqueIdKey, futureIdValues);
327 if (existingFutureIdValue != null) {
329 newIdValuesList = existingFutureIdValue.get();
330 return newIdValuesList;
331 } catch (InterruptedException | ExecutionException e) {
332 LOG.warn("Could not obtain id from existing futureIdValue for idKey {} and pool {}.",
333 idKey, parentPoolName);
334 throw new IdManagerException(e.getMessage(), e);
337 long newIdValue = -1;
338 localPoolName = localPoolName.intern();
339 InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
340 InstanceIdentifier<IdEntries> existingId = idUtils.getIdEntry(parentIdPoolInstanceIdentifier, idKey);
341 Optional<IdEntries> existingIdEntry = singleTxDB.syncReadOptional(CONFIGURATION, existingId);
342 if (existingIdEntry.isPresent()) {
343 newIdValuesList = existingIdEntry.get().getIdValue();
344 if (LOG.isDebugEnabled()) {
345 LOG.debug("Existing ids {} for the key {} ", newIdValuesList, idKey);
347 // Inform other waiting threads about this new value.
348 futureIdValues.complete(newIdValuesList);
349 // This is to avoid stale entries in the map. If this thread had populated the map,
350 // then the entry should be removed.
351 if (existingFutureIdValue == null) {
352 idUtils.allocatedIdMap.remove(uniqueIdKey);
354 idUtils.unlock(lockManager, uniqueIdKey);
355 return newIdValuesList;
357 //This get will not help in concurrent reads. Hence the same read needs to be done again.
358 IdLocalPool localIdPool = localPool.get(parentPoolName);
359 if (localIdPool == null) {
360 idUtils.lock(lockManager, parentPoolName);
362 //Check if a previous thread that got the cluster-wide lock first, has created the localPool
363 if (localPool.get(parentPoolName) == null) {
364 WriteTransaction tx = broker.newWriteOnlyTransaction();
365 IdPool parentIdPool = singleTxDB.syncRead(CONFIGURATION, parentIdPoolInstanceIdentifier);
366 localIdPool = createLocalPool(tx, localPoolName, parentIdPool); // Return localIdPool.....
367 tx.submit().checkedGet();
369 localIdPool = localPool.get(parentPoolName);
372 idUtils.unlock(lockManager, parentPoolName);
375 if (LOG.isDebugEnabled()) {
376 LOG.debug("Got pool {}", localIdPool);
379 newIdValue = getIdFromLocalPoolCache(localIdPool, parentPoolName);
380 newIdValuesList.add(newIdValue);
382 IdPool parentIdPool = singleTxDB.syncRead(CONFIGURATION, parentIdPoolInstanceIdentifier);
383 long totalAvailableIdCount = localIdPool.getAvailableIds().getAvailableIdCount()
384 + localIdPool.getReleasedIds().getAvailableIdCount();
385 AvailableIdsHolderBuilder availableParentIds = idUtils.getAvailableIdsHolderBuilder(parentIdPool);
386 ReleasedIdsHolderBuilder releasedParentIds = idUtils.getReleaseIdsHolderBuilder(parentIdPool);
387 totalAvailableIdCount = totalAvailableIdCount + releasedParentIds.getAvailableIdCount()
388 + idUtils.getAvailableIdsCount(availableParentIds);
389 if (totalAvailableIdCount > size) {
392 newIdValue = getIdFromLocalPoolCache(localIdPool, parentPoolName);
393 } catch (OperationFailedException e) {
394 if (LOG.isDebugEnabled()) {
395 LOG.debug("Releasing IDs to pool {}", localPoolName);
397 // Releasing the IDs added in newIdValuesList since a null list would be returned now, as the
398 // requested size of list IDs exceeds the number of available IDs.
399 updateDelayedEntriesInLocalCache(newIdValuesList, parentPoolName, localIdPool);
401 newIdValuesList.add(newIdValue);
405 throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentPoolName));
408 if (LOG.isDebugEnabled()) {
409 LOG.debug("The newIdValues {} for the idKey {}", newIdValuesList, idKey);
411 idUtils.releaseIdLatchMap.put(uniqueIdKey, new CountDownLatch(1));
412 UpdateIdEntryJob job = new UpdateIdEntryJob(parentPoolName, localPoolName, idKey, newIdValuesList, broker,
413 idUtils, lockManager);
414 DataStoreJobCoordinator.getInstance().enqueueJob(parentPoolName, job, IdUtils.RETRY_COUNT);
415 futureIdValues.complete(newIdValuesList);
416 return newIdValuesList;
419 private Long getIdFromLocalPoolCache(IdLocalPool localIdPool, String parentPoolName)
420 throws OperationFailedException, IdManagerException {
422 IdHolder releasedIds = localIdPool.getReleasedIds();
423 Optional<Long> releasedId = Optional.absent();
424 releasedId = releasedIds.allocateId();
425 if (releasedId.isPresent()) {
426 IdHolderSyncJob poolSyncJob =
427 new IdHolderSyncJob(localIdPool.getPoolName(), localIdPool.getReleasedIds(), broker,
429 DataStoreJobCoordinator.getInstance().enqueueJob(localIdPool.getPoolName(),
430 poolSyncJob, IdUtils.RETRY_COUNT);
431 return releasedId.get();
433 Optional<Long> availableId = Optional.absent();
434 IdHolder availableIds = localIdPool.getAvailableIds();
435 if (availableIds != null) {
436 availableId = availableIds.allocateId();
437 if (availableId.isPresent()) {
438 IdHolderSyncJob poolSyncJob =
439 new IdHolderSyncJob(localIdPool.getPoolName(), localIdPool.getAvailableIds(),
441 DataStoreJobCoordinator.getInstance().enqueueJob(localIdPool.getPoolName(),
442 poolSyncJob, IdUtils.RETRY_COUNT);
443 return availableId.get();
446 long idCount = getIdBlockFromParentPool(parentPoolName, localIdPool);
448 if (LOG.isDebugEnabled()) {
449 LOG.debug("Unable to allocate Id block from global pool");
451 throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentPoolName));
457 * Changes made to availableIds and releasedIds will not be persisted to the datastore.
459 private long getIdBlockFromParentPool(String parentPoolName, IdLocalPool localIdPool)
460 throws OperationFailedException, IdManagerException {
461 if (LOG.isDebugEnabled()) {
462 LOG.debug("Allocating block of id from parent pool {}", parentPoolName);
464 InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
465 parentPoolName = parentPoolName.intern();
466 idUtils.lock(lockManager, parentPoolName);
469 // Check if the childpool already got id block.
470 long availableIdCount =
471 localIdPool.getAvailableIds().getAvailableIdCount()
472 + localIdPool.getReleasedIds().getAvailableIdCount();
473 if (availableIdCount > 0) {
474 return availableIdCount;
476 WriteTransaction tx = broker.newWriteOnlyTransaction();
477 IdPool parentIdPool = singleTxDB.syncRead(CONFIGURATION, idPoolInstanceIdentifier);
478 idCount = allocateIdBlockFromParentPool(localIdPool, parentIdPool, tx);
479 tx.submit().checkedGet();
480 } catch (IdManagerException | NullPointerException e) {
481 LOG.error("Error getting id block from parent pool. {}", e.getMessage());
483 idUtils.unlock(lockManager, parentPoolName);
488 private long allocateIdBlockFromParentPool(IdLocalPool localPoolCache, IdPool parentIdPool, WriteTransaction tx)
489 throws OperationFailedException, IdManagerException {
491 ReleasedIdsHolderBuilder releasedIdsBuilderParent = idUtils.getReleaseIdsHolderBuilder(parentIdPool);
493 idCount = allocateIdBlockFromReleasedIdsHolder(localPoolCache, releasedIdsBuilderParent, parentIdPool, tx);
497 idCount = allocateIdBlockFromAvailableIdsHolder(localPoolCache, parentIdPool, tx);
501 idCount = getIdsFromOtherChildPools(releasedIdsBuilderParent, parentIdPool);
503 if (LOG.isDebugEnabled()) {
504 LOG.debug("Unable to allocate Id block from global pool");
506 throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentIdPool.getPoolName()));
511 private long getIdsFromOtherChildPools(ReleasedIdsHolderBuilder releasedIdsBuilderParent, IdPool parentIdPool)
512 throws OperationFailedException {
513 List<ChildPools> childPoolsList = parentIdPool.getChildPools();
514 // Sorting the child pools on last accessed time so that the pool that
515 // was not accessed for a long time comes first.
516 Collections.sort(childPoolsList,
517 (childPool1, childPool2) -> childPool1.getLastAccessTime().compareTo(childPool2.getLastAccessTime()));
518 long currentTime = System.currentTimeMillis() / 1000;
519 for (ChildPools childPools : childPoolsList) {
520 if (childPools.getLastAccessTime() + DEFAULT_IDLE_TIME > currentTime) {
523 if (!childPools.getChildPoolName().equals(idUtils.getLocalPoolName(parentIdPool.getPoolName()))) {
524 InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils
525 .getIdPoolInstance(childPools.getChildPoolName());
526 IdPool otherChildPool = singleTxDB.syncRead(CONFIGURATION, idPoolInstanceIdentifier);
527 ReleasedIdsHolderBuilder releasedIds = idUtils.getReleaseIdsHolderBuilder(otherChildPool);
529 List<DelayedIdEntries> delayedIdEntriesChild = releasedIds.getDelayedIdEntries();
530 List<DelayedIdEntries> delayedIdEntriesParent = releasedIdsBuilderParent.getDelayedIdEntries();
531 if (delayedIdEntriesParent == null) {
532 delayedIdEntriesParent = new LinkedList<>();
534 delayedIdEntriesParent.addAll(delayedIdEntriesChild);
535 delayedIdEntriesChild.removeAll(delayedIdEntriesChild);
537 AvailableIdsHolderBuilder availableIds = idUtils.getAvailableIdsHolderBuilder(otherChildPool);
538 while (idUtils.isIdAvailable(availableIds)) {
539 long cursor = availableIds.getCursor() + 1;
540 delayedIdEntriesParent.add(idUtils.createDelayedIdEntry(cursor, currentTime));
541 availableIds.setCursor(cursor);
544 long totalAvailableIdCount = releasedIds.getDelayedIdEntries().size()
545 + idUtils.getAvailableIdsCount(availableIds);
546 long count = releasedIdsBuilderParent.getAvailableIdCount() + totalAvailableIdCount;
547 releasedIdsBuilderParent.setDelayedIdEntries(delayedIdEntriesParent).setAvailableIdCount(count);
548 singleTxDB.syncUpdate(CONFIGURATION, idPoolInstanceIdentifier,
549 new IdPoolBuilder().setKey(new IdPoolKey(otherChildPool.getPoolName()))
550 .setAvailableIdsHolder(availableIds.build()).setReleasedIdsHolder(releasedIds.build())
552 return totalAvailableIdCount;
558 private long allocateIdBlockFromReleasedIdsHolder(IdLocalPool localIdPool,
559 ReleasedIdsHolderBuilder releasedIdsBuilderParent, IdPool parentIdPool, WriteTransaction tx) {
560 if (releasedIdsBuilderParent.getAvailableIdCount() == 0) {
561 if (LOG.isDebugEnabled()) {
562 LOG.debug("Ids unavailable in releasedIds of parent pool {}", parentIdPool);
566 List<DelayedIdEntries> delayedIdEntriesParent = releasedIdsBuilderParent.getDelayedIdEntries();
567 int idCount = Math.min(delayedIdEntriesParent.size(), parentIdPool.getBlockSize());
568 List<DelayedIdEntries> idEntriesToBeRemoved = delayedIdEntriesParent.subList(0, idCount);
569 ReleasedIdHolder releasedIds = (ReleasedIdHolder) localIdPool.getReleasedIds();
570 List<DelayedIdEntry> delayedIdEntriesLocalCache = releasedIds.getDelayedEntries();
571 delayedIdEntriesLocalCache = idEntriesToBeRemoved
573 .map(delayedIdEntry -> new DelayedIdEntry(delayedIdEntry
574 .getId(), delayedIdEntry.getReadyTimeSec()))
575 .sorted((idEntry1, idEntry2) -> Long.compare(idEntry1.getReadyTimeSec(),
576 idEntry2.getReadyTimeSec())).collect(Collectors.toList());
577 releasedIds.setDelayedEntries(delayedIdEntriesLocalCache);
578 releasedIds.setAvailableIdCount(releasedIds.getAvailableIdCount() + idCount);
579 localIdPool.setReleasedIds(releasedIds);
580 delayedIdEntriesParent.removeAll(idEntriesToBeRemoved);
581 releasedIdsBuilderParent.setDelayedIdEntries(delayedIdEntriesParent);
582 InstanceIdentifier<ReleasedIdsHolder> releasedIdsHolderInstanceIdentifier = InstanceIdentifier
583 .builder(IdPools.class).child(IdPool.class,
584 new IdPoolKey(parentIdPool.getPoolName())).child(ReleasedIdsHolder.class).build();
585 releasedIdsBuilderParent.setAvailableIdCount(releasedIdsBuilderParent.getAvailableIdCount() - idCount);
586 if (LOG.isDebugEnabled()) {
587 LOG.debug("Allocated {} ids from releasedIds of parent pool {}", idCount, parentIdPool);
589 tx.merge(CONFIGURATION, releasedIdsHolderInstanceIdentifier,
590 releasedIdsBuilderParent.build(), true);
594 private long allocateIdBlockFromAvailableIdsHolder(IdLocalPool localIdPool, IdPool parentIdPool,
595 WriteTransaction tx) {
597 AvailableIdsHolderBuilder availableIdsBuilderParent = idUtils.getAvailableIdsHolderBuilder(parentIdPool);
598 long end = availableIdsBuilderParent.getEnd();
599 long cur = availableIdsBuilderParent.getCursor();
600 if (!idUtils.isIdAvailable(availableIdsBuilderParent)) {
601 if (LOG.isDebugEnabled()) {
602 LOG.debug("Ids exhausted in parent pool {}", parentIdPool);
606 // Update availableIdsHolder of Local Pool
607 idCount = Math.min(end - cur, parentIdPool.getBlockSize());
608 AvailableIdHolder availableIds = new AvailableIdHolder(idUtils, cur + 1, cur + idCount);
609 localIdPool.setAvailableIds(availableIds);
610 // Update availableIdsHolder of Global Pool
611 InstanceIdentifier<AvailableIdsHolder> availableIdsHolderInstanceIdentifier = InstanceIdentifier
612 .builder(IdPools.class).child(IdPool.class,
613 new IdPoolKey(parentIdPool.getPoolName())).child(AvailableIdsHolder.class).build();
614 availableIdsBuilderParent.setCursor(cur + idCount);
615 if (LOG.isDebugEnabled()) {
616 LOG.debug("Allocated {} ids from availableIds of global pool {}", idCount, parentIdPool);
618 tx.merge(CONFIGURATION, availableIdsHolderInstanceIdentifier,
619 availableIdsBuilderParent.build(), true);
623 private void releaseIdFromLocalPool(String parentPoolName, String localPoolName, String idKey)
624 throws ReadFailedException, IdManagerException {
625 String idLatchKey = idUtils.getUniqueKey(parentPoolName, idKey);
626 if (LOG.isDebugEnabled()) {
627 LOG.debug("Releasing ID {} from pool {}", idKey, localPoolName);
629 java.util.Optional.ofNullable(idUtils.releaseIdLatchMap.get(idLatchKey)).ifPresent(latch -> {
631 latch.await(10, TimeUnit.SECONDS);
632 } catch (InterruptedException ignored) {
633 LOG.warn("Thread interrupted while releasing id {} from id pool {}", idKey, parentPoolName);
635 idUtils.releaseIdLatchMap.remove(idLatchKey);
638 localPoolName = localPoolName.intern();
639 InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
640 IdPool parentIdPool = singleTxDB.syncRead(CONFIGURATION, parentIdPoolInstanceIdentifier);
641 List<IdEntries> idEntries = parentIdPool.getIdEntries();
642 List<IdEntries> newIdEntries = idEntries;
643 if (idEntries == null) {
644 throw new IdManagerException("Id Entries does not exist");
646 InstanceIdentifier<IdEntries> existingId = idUtils.getIdEntry(parentIdPoolInstanceIdentifier, idKey);
647 Optional<IdEntries> existingIdEntryObject = singleTxDB.syncReadOptional(CONFIGURATION, existingId);
648 if (!existingIdEntryObject.isPresent()) {
649 if (LOG.isDebugEnabled()) {
650 LOG.debug("Specified Id key {} does not exist in id pool {}", idKey, parentPoolName);
652 idUtils.unlock(lockManager, idLatchKey);
655 IdEntries existingIdEntry = existingIdEntryObject.get();
656 List<Long> idValuesList = existingIdEntry.getIdValue();
657 IdLocalPool localIdPoolCache = localPool.get(parentPoolName);
658 boolean isRemoved = newIdEntries.remove(existingIdEntry);
659 if (LOG.isDebugEnabled()) {
660 LOG.debug("The entry {} is removed {}", existingIdEntry, isRemoved);
662 updateDelayedEntriesInLocalCache(idValuesList, parentPoolName, localIdPoolCache);
663 IdHolderSyncJob poolSyncJob = new IdHolderSyncJob(localPoolName, localIdPoolCache.getReleasedIds(), broker,
665 DataStoreJobCoordinator.getInstance().enqueueJob(localPoolName, poolSyncJob, IdUtils.RETRY_COUNT);
666 scheduleCleanUpTask(localIdPoolCache, parentPoolName, parentIdPool.getBlockSize());
667 if (LOG.isDebugEnabled()) {
668 LOG.debug("Released id ({}, {}) from pool {}", idKey, idValuesList, localPoolName);
670 // Updating id entries in the parent pool. This will be used for restart scenario
671 UpdateIdEntryJob job = new UpdateIdEntryJob(parentPoolName, localPoolName, idKey, null, broker, idUtils,
673 DataStoreJobCoordinator.getInstance().enqueueJob(parentPoolName, job, IdUtils.RETRY_COUNT);
676 private void scheduleCleanUpTask(final IdLocalPool localIdPoolCache,
677 final String parentPoolName, final int blockSize) {
678 TimerTask scheduledTask = new TimerTask() {
681 CleanUpJob job = new CleanUpJob(localIdPoolCache, broker, parentPoolName, blockSize, lockManager,
683 DataStoreJobCoordinator.getInstance().enqueueJob(localIdPoolCache.getPoolName(), job,
684 IdUtils.RETRY_COUNT);
687 cleanJobTimer.schedule(scheduledTask, IdUtils.DEFAULT_DELAY_TIME * 1000);
690 private IdPool createGlobalPool(WriteTransaction tx, String poolName, long low, long high, long blockSize)
691 throws ReadFailedException {
693 InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils.getIdPoolInstance(poolName);
694 Optional<IdPool> existingIdPool = singleTxDB.syncReadOptional(CONFIGURATION, idPoolInstanceIdentifier);
695 if (!existingIdPool.isPresent()) {
696 if (LOG.isDebugEnabled()) {
697 LOG.debug("Creating new global pool {}", poolName);
699 idPool = idUtils.createGlobalPool(poolName, low, high, blockSize);
700 tx.put(CONFIGURATION, idPoolInstanceIdentifier, idPool, true);
702 idPool = existingIdPool.get();
703 if (LOG.isDebugEnabled()) {
704 LOG.debug("GlobalPool exists {}", idPool);
710 private IdLocalPool createLocalPool(WriteTransaction tx, String localPoolName, IdPool idPool)
711 throws OperationFailedException, IdManagerException {
712 localPoolName = localPoolName.intern();
713 IdLocalPool idLocalPool = new IdLocalPool(idUtils, localPoolName);
714 allocateIdBlockFromParentPool(idLocalPool, idPool, tx);
715 String parentPool = idPool.getPoolName();
716 localPool.put(parentPool, idLocalPool);
717 LocalPoolCreateJob job = new LocalPoolCreateJob(idLocalPool, broker, idPool.getPoolName(),
718 idPool.getBlockSize(), idUtils);
719 DataStoreJobCoordinator.getInstance().enqueueJob(localPoolName, job, IdUtils.RETRY_COUNT);
723 private void deletePool(String poolName) {
724 LocalPoolDeleteJob job = new LocalPoolDeleteJob(poolName, broker, idUtils);
725 DataStoreJobCoordinator.getInstance().enqueueJob(poolName, job, IdUtils.RETRY_COUNT);
728 public void poolDeleted(String parentPoolName, String poolName) {
729 IdLocalPool idLocalPool = localPool.get(parentPoolName);
730 if (idLocalPool != null) {
731 if (idLocalPool.getPoolName().equals(poolName)) {
732 localPool.remove(parentPoolName);
737 private void updateDelayedEntriesInLocalCache(List<Long> idsList, String parentPoolName,
738 IdLocalPool localPoolCache) {
739 for (long idValue : idsList) {
740 localPoolCache.getReleasedIds().addId(idValue);
742 localPool.put(parentPoolName, localPoolCache);
745 public java.util.Optional<IdLocalPool> getIdLocalPool(String parentPoolName) {
746 return java.util.Optional.ofNullable(localPool.get(parentPoolName))
747 .map(localPool -> localPool.deepCopyOf());