import com.google.common.base.Optional;
import com.google.common.util.concurrent.ListenableFuture;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.inject.Singleton;
+
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
ReleasedIdHolder releasedIdHolder = new ReleasedIdHolder(idUtils, releasedIdsHolder.getDelayedTimeSec());
releasedIdHolder.setAvailableIdCount(releasedIdsHolder.getAvailableIdCount());
List<DelayedIdEntries> delayedEntries = releasedIdsHolder.getDelayedIdEntries();
- List<DelayedIdEntry> delayedIdEntryInCache = new ArrayList<>();
+ List<DelayedIdEntry> delayedIdEntryInCache = new CopyOnWriteArrayList<>();
if (delayedEntries != null) {
delayedIdEntryInCache = delayedEntries
.parallelStream()
output.setIdValue(newIdValue);
futureResult = RpcResultBuilder.<AllocateIdOutput>success().withResult(output.build()).buildFuture();
} catch (OperationFailedException | IdManagerException e) {
+ java.util.Optional.ofNullable(
+ idUtils.allocatedIdMap.remove(idUtils.getUniqueKey(poolName, idKey)))
+ .ifPresent(futureId -> futureId.completeExceptionally(e));
futureResult = buildFailedRpcResultFuture("allocateId failed: " + input.toString(), e);
}
return futureResult;
output.setIdValues(newIdValuesList);
futureResult = RpcResultBuilder.<AllocateIdRangeOutput>success().withResult(output.build()).buildFuture();
} catch (OperationFailedException | IdManagerException e) {
+ java.util.Optional.ofNullable(
+ idUtils.allocatedIdMap.remove(idUtils.getUniqueKey(poolName, idKey)))
+ .ifPresent(futureId -> futureId.completeExceptionally(e));
futureResult = buildFailedRpcResultFuture("allocateIdRange failed: " + input.toString(), e);
}
return futureResult;
return failedRpcResultBuilder.buildFuture();
}
- private List<Long> allocateIdFromLocalPool(String parentPoolName, String localPoolName, String idKey, long size)
- throws OperationFailedException, IdManagerException {
+ private List<Long> allocateIdFromLocalPool(String parentPoolName, String localPoolName,
+ String idKey, long size) throws OperationFailedException, IdManagerException {
if (LOG.isDebugEnabled()) {
LOG.debug("Allocating id from local pool {}. Parent pool {}. Idkey {}", localPoolName, parentPoolName,
idKey);
}
- long newIdValue = -1;
List<Long> newIdValuesList = new ArrayList<>();
+ String uniqueIdKey = idUtils.getUniqueKey(parentPoolName, idKey);
+ CompletableFuture<List<Long>> futureIdValues = new CompletableFuture<>();
+ CompletableFuture<List<Long>> existingFutureIdValue =
+ idUtils.allocatedIdMap.putIfAbsent(uniqueIdKey, futureIdValues);
+ if (existingFutureIdValue != null) {
+ try {
+ newIdValuesList = existingFutureIdValue.get();
+ return newIdValuesList;
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.warn("Could not obtain id from existing futureIdValue for idKey {} and pool {}.",
+ idKey, parentPoolName);
+ throw new IdManagerException(e.getMessage(), e);
+ }
+ }
+ long newIdValue = -1;
localPoolName = localPoolName.intern();
InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
InstanceIdentifier<IdEntries> existingId = idUtils.getIdEntry(parentIdPoolInstanceIdentifier, idKey);
if (LOG.isDebugEnabled()) {
LOG.debug("Existing ids {} for the key {} ", newIdValuesList, idKey);
}
+ // Inform other waiting threads about this new value.
+ futureIdValues.complete(newIdValuesList);
+ // This is to avoid stale entries in the map. If this thread had populated the map,
+ // then the entry should be removed.
+ if (existingFutureIdValue == null) {
+ idUtils.allocatedIdMap.remove(uniqueIdKey);
+ }
return newIdValuesList;
}
//This get will not help in concurrent reads. Hence the same read needs to be done again.
if (LOG.isDebugEnabled()) {
LOG.debug("The newIdValues {} for the idKey {}", newIdValuesList, idKey);
}
- idUtils.releaseIdLatchMap.put(parentPoolName + idKey, new CountDownLatch(1));
+ idUtils.releaseIdLatchMap.put(uniqueIdKey, new CountDownLatch(1));
UpdateIdEntryJob job = new UpdateIdEntryJob(parentPoolName, localPoolName, idKey, newIdValuesList, broker,
idUtils);
DataStoreJobCoordinator.getInstance().enqueueJob(parentPoolName, job, IdUtils.RETRY_COUNT);
+ futureIdValues.complete(newIdValuesList);
return newIdValuesList;
}
throws OperationFailedException, IdManagerException {
while (true) {
IdHolder releasedIds = localIdPool.getReleasedIds();
- Optional<Long> releasedId = releasedIds.allocateId();
+ Optional<Long> releasedId = Optional.absent();
+ releasedId = releasedIds.allocateId();
if (releasedId.isPresent()) {
- IdHolderSyncJob poolSyncJob = new IdHolderSyncJob(localIdPool.getPoolName(), releasedIds, broker,
- idUtils);
- DataStoreJobCoordinator.getInstance().enqueueJob(localIdPool.getPoolName(), poolSyncJob,
- IdUtils.RETRY_COUNT);
+ IdHolderSyncJob poolSyncJob =
+ new IdHolderSyncJob(localIdPool.getPoolName(), localIdPool.getReleasedIds(), broker,
+ idUtils);
+ DataStoreJobCoordinator.getInstance().enqueueJob(localIdPool.getPoolName(),
+ poolSyncJob, IdUtils.RETRY_COUNT);
return releasedId.get();
}
+ Optional<Long> availableId = Optional.absent();
IdHolder availableIds = localIdPool.getAvailableIds();
if (availableIds != null) {
- Optional<Long> availableId = availableIds.allocateId();
+ availableId = availableIds.allocateId();
if (availableId.isPresent()) {
- IdHolderSyncJob poolSyncJob = new IdHolderSyncJob(localIdPool.getPoolName(), availableIds, broker,
- idUtils);
- DataStoreJobCoordinator.getInstance().enqueueJob(localIdPool.getPoolName(), poolSyncJob,
- IdUtils.RETRY_COUNT);
+ IdHolderSyncJob poolSyncJob =
+ new IdHolderSyncJob(localIdPool.getPoolName(), localIdPool.getAvailableIds(),
+ broker, idUtils);
+ DataStoreJobCoordinator.getInstance().enqueueJob(localIdPool.getPoolName(),
+ poolSyncJob, IdUtils.RETRY_COUNT);
return availableId.get();
}
}
private void releaseIdFromLocalPool(String parentPoolName, String localPoolName, String idKey)
throws ReadFailedException, IdManagerException {
- String idLatchKey = parentPoolName + idKey;
+ String idLatchKey = idUtils.getUniqueKey(parentPoolName, idKey);
java.util.Optional.ofNullable(idUtils.releaseIdLatchMap.get(idLatchKey)).ifPresent(latch -> {
try {
latch.await(5, TimeUnit.SECONDS);
import com.google.common.base.Optional;
import com.google.common.util.concurrent.Futures;
+
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Ignore;
@RunWith(MockitoJUnitRunner.class)
public class IdManagerTest {
- Map<InstanceIdentifier<?>, DataObject> configDataStore = new HashMap<>();
+ ConcurrentHashMap<InstanceIdentifier<?>, DataObject> configDataStore = new ConcurrentHashMap<>();
@Mock DataBroker dataBroker;
@Mock ReadOnlyTransaction mockReadTx;
@Mock WriteTransaction mockWriteTx;
configDataStore.put(invocation.getArgumentAt(1, KeyedInstanceIdentifier.class),
invocation.getArgumentAt(2, IdPool.class));
return null;
- }).when(mockWriteTx).put(eq(LogicalDatastoreType.CONFIGURATION), Matchers.any(), any(IdPool.class), eq(true));
+ }).when(mockWriteTx).put(eq(LogicalDatastoreType.CONFIGURATION), Matchers.any(), Matchers.any(), eq(true));
doAnswer(invocation -> {
configDataStore.put(invocation.getArgumentAt(1, KeyedInstanceIdentifier.class),
invocation.getArgumentAt(2, IdPool.class));
return null;
- }).when(mockWriteTx).merge(eq(LogicalDatastoreType.CONFIGURATION), Matchers.any(), any(ChildPools.class),
- eq(true));
+ }).when(mockWriteTx).merge(eq(LogicalDatastoreType.CONFIGURATION), Matchers.any(), Matchers.any(), eq(true));
doAnswer(invocation -> {
configDataStore.put(invocation.getArgumentAt(1, KeyedInstanceIdentifier.class),
invocation.getArgumentAt(2, IdPool.class));
return null;
- }).when(mockWriteTx).merge(eq(LogicalDatastoreType.CONFIGURATION), Matchers.any(), any(IdPool.class), eq(true));
+ }).when(mockWriteTx).merge(eq(LogicalDatastoreType.CONFIGURATION), Matchers.any(), Matchers.any());
doAnswer(invocation -> {
- configDataStore.put(invocation.getArgumentAt(1, KeyedInstanceIdentifier.class), null);
+ configDataStore.remove(invocation.getArgumentAt(1, KeyedInstanceIdentifier.class));
return null;
}).when(mockWriteTx).delete(eq(LogicalDatastoreType.CONFIGURATION), Matchers.<InstanceIdentifier<IdPool>>any());
IdPool pool;
assertTrue(result.get().isSuccessful());
// Just to ensure the local pool is also written. Even if it is not triggered Test case will pass.
- Thread.sleep(100);
+ waitUntilJobIsDone();
assertTrue(configDataStore.size() > 0);
DataObject dataObject = configDataStore.get(localPoolIdentifier);
if (dataObject instanceof IdPool) {
AllocateIdInput allocateIdInput = buildAllocateId(poolName, idKey);
Future<RpcResult<AllocateIdOutput>> result = idManager.allocateId(allocateIdInput);
assertTrue(result.get().isSuccessful());
- Thread.sleep(100);
+ waitUntilJobIsDone();
assertTrue(configDataStore.size() > 0);
DataObject dataObject = configDataStore.get(localPoolIdentifier);
if (dataObject instanceof IdPool) {
ReleaseIdInput releaseIdInput = createReleaseIdInput(poolName, idKey);
Future<RpcResult<Void>> result = idManager.releaseId(releaseIdInput);
assertTrue(result.get().isSuccessful());
- Thread.sleep(100);
+ waitUntilJobIsDone();
+
assertTrue(configDataStore.size() > 0);
DataObject dataObject = configDataStore.get(localPoolIdentifier);
if (dataObject instanceof IdPool) {
AllocateIdInput allocateIdInput = buildAllocateId(poolName, idKey);
Future<RpcResult<AllocateIdOutput>> result = idManager.allocateId(allocateIdInput);
assertTrue(result.get().isSuccessful());
- Thread.sleep(3);
+ waitUntilJobIsDone();
+
assertTrue(configDataStore.size() > 0);
InstanceIdentifier<IdPool> localPoolIdentifier = buildInstanceIdentifier(localPoolName);
DataObject dataObject = configDataStore.get(localPoolIdentifier);
List<IdPool> listOfIdPool = new ArrayList<>();
listOfIdPool.add(localPool);
listOfIdPool.add(globalIdPool);
+ // Pre-loading the map so that we can remove it when it is removed from DS.
+ configDataStore.put(parentPoolIdentifier, globalIdPool);
+ configDataStore.put(localPoolIdentifier, localPool);
setupMocks(listOfIdPool);
Optional<IdPool> expected = Optional.of(globalIdPool);
doReturn(Futures.immediateCheckedFuture(expected)).when(mockReadTx).read(
LogicalDatastoreType.CONFIGURATION, parentPoolIdentifier);
DeleteIdPoolInput deleteIdPoolInput = createDeleteIdPoolInput(poolName);
Future<RpcResult<Void>> result = idManager.deleteIdPool(deleteIdPoolInput);
- Thread.sleep(3);
+ waitUntilJobIsDone();
assertTrue(result.get().isSuccessful());
- assertTrue(configDataStore.size() > 0);
+ assertTrue(configDataStore.size() == 0);
DataObject dataObject = configDataStore.get(localPoolIdentifier);
assertEquals(dataObject, null);
dataObject = configDataStore.get(parentPoolIdentifier);
@Test
public void testMultithreadedIdAllocationFromAvailableIds() throws Exception {
- List<IdPool> listOfIdPool = new ArrayList<>();
- IdPool localIdPool = buildLocalIdPool(blockSize, idStart, idStart + blockSize - 1, idStart - 1, localPoolName,
- poolName).build();
- listOfIdPool.add(localIdPool);
- int poolSize = 10;
- IdPool globalIdPool = buildGlobalIdPool(poolName, idStart, poolSize, blockSize,
- buildChildPool(localPoolName)).build();
- listOfIdPool.add(globalIdPool);
- setupMocks(listOfIdPool);
- doReturn(Futures.immediateCheckedFuture(Optional.of(globalIdPool))).when(mockReadTx).read(
- LogicalDatastoreType.CONFIGURATION, parentPoolIdentifier);
- ExecutorService executor = Executors.newCachedThreadPool();
+ setupMockForMultiThreads(false);
int numberOfTasks = 3;
CountDownLatch latch = new CountDownLatch(numberOfTasks);
Set<Long> idSet = new CopyOnWriteArraySet<>();
- requestIdsConcurrently(latch, numberOfTasks, idSet);
+ requestIdsConcurrently(latch, numberOfTasks, idSet, false);
latch.await();
- Thread.sleep(500);
+ waitUntilJobIsDone();
DataObject dataObject = configDataStore.get(localPoolIdentifier);
if (dataObject instanceof IdPool) {
IdPool pool = (IdPool) dataObject;
}
}
- @Ignore
+ @Test
public void testMultithreadedIdAllocationFromReleasedIds() throws Exception {
- List<DelayedIdEntries> delayedIdEntries = buildDelayedIdEntries(new long[] {100, 101});
- ReleasedIdsHolder expectedReleasedIds = createReleasedIdsHolder(2, delayedIdEntries , 0);
- List<IdPool> listOfIdPool = new ArrayList<>();
- IdPool localIdPool = buildLocalIdPool(blockSize, idStart, idStart + blockSize - 1,
- idStart + blockSize - 1, localPoolName, poolName).build();
- listOfIdPool.add(localIdPool);
- int poolSize = 10;
- IdPool globalIdPool = buildGlobalIdPool(poolName, idStart, poolSize, blockSize,
- buildChildPool(localPoolName)).setReleasedIdsHolder(expectedReleasedIds).build();
- listOfIdPool.add(globalIdPool);
- setupMocks(listOfIdPool);
- doReturn(Futures.immediateCheckedFuture(Optional.of(globalIdPool))).when(mockReadTx).read(
- LogicalDatastoreType.CONFIGURATION, parentPoolIdentifier);
- ExecutorService executor = Executors.newCachedThreadPool();
+ setupMockForMultiThreads(true);
int numberOfTasks = 3;
CountDownLatch latch = new CountDownLatch(numberOfTasks);
Set<Long> idSet = new CopyOnWriteArraySet<>();
- requestIdsConcurrently(latch, numberOfTasks, idSet);
+ requestIdsConcurrently(latch, numberOfTasks, idSet, false);
latch.await();
- Thread.sleep(2000);
+ waitUntilJobIsDone();
DataObject dataObject = configDataStore.get(localPoolIdentifier);
if (dataObject instanceof IdPool) {
IdPool pool = (IdPool) dataObject;
}
}
+ @Test
+ public void testMultithreadedIdAllocationForSameKeyFromAvailableIds() throws Exception {
+ setupMockForMultiThreads(false);
+ int numberOfTasks = 3;
+ CountDownLatch latch = new CountDownLatch(numberOfTasks);
+ Set<Long> idSet = new CopyOnWriteArraySet<>();
+ requestIdsConcurrently(latch, numberOfTasks, idSet, true);
+ latch.await();
+ assertTrue(idSet.size() == 1);
+ waitUntilJobIsDone();
+ DataObject dataObject = configDataStore.get(localPoolIdentifier);
+ if (dataObject instanceof IdPool) {
+ IdPool pool = (IdPool) dataObject;
+ assertTrue(idStart == pool.getAvailableIdsHolder().getCursor());
+ }
+ }
+
+ @Test
+ public void testMultithreadedIdAllocationForSameKeyFromReleasedIds() throws Exception {
+ setupMockForMultiThreads(true);
+ int numberOfTasks = 3;
+ CountDownLatch latch = new CountDownLatch(numberOfTasks);
+ Set<Long> idSet = new CopyOnWriteArraySet<>();
+ requestIdsConcurrently(latch, numberOfTasks, idSet, true);
+ latch.await();
+ assertTrue(idSet.size() == 1);
+ waitUntilJobIsDone();
+ DataObject dataObject = configDataStore.get(localPoolIdentifier);
+ if (dataObject instanceof IdPool) {
+ IdPool pool = (IdPool) dataObject;
+ assertTrue(pool.getReleasedIdsHolder().getAvailableIdCount() == 2);
+ }
+ }
+
+ private void setupMockForMultiThreads(boolean isRelease) throws ReadFailedException, UnknownHostException {
+ List<IdPool> listOfIdPool = new ArrayList<>();
+ IdPoolBuilder localIdPool =
+ buildLocalIdPool(blockSize, idStart, idStart + blockSize, idStart - 1,
+ localPoolName, poolName);
+ int poolSize = 10;
+ IdPool globalIdPool = buildGlobalIdPool(poolName, idStart, poolSize, blockSize,
+ buildChildPool(localPoolName)).build();
+ listOfIdPool.add(globalIdPool);
+ setupMocks(listOfIdPool);
+ doReturn(Futures.immediateCheckedFuture(Optional.of(globalIdPool))).when(mockReadTx).read(
+ LogicalDatastoreType.CONFIGURATION, parentPoolIdentifier);
+
+ List<DelayedIdEntries> delayedIdEntries = buildDelayedIdEntries(new long[] {100, 101, 102});
+ ReleasedIdsHolder expectedReleasedIds = createReleasedIdsHolder(3, delayedIdEntries , 0);
+ if (isRelease) {
+ localIdPool.setReleasedIdsHolder(expectedReleasedIds);
+ }
+ listOfIdPool.add(localIdPool.build());
+ listOfIdPool.add(globalIdPool);
+ setupMocks(listOfIdPool);
+ doAnswer(invocation -> {
+ DataObject result = configDataStore.get(idUtils.getIdEntry(parentPoolIdentifier, idKey));
+ if (result == null) {
+ return Futures.immediateCheckedFuture(Optional.absent());
+ }
+ if (result instanceof IdEntries) {
+ return Futures.immediateCheckedFuture(Optional.of((IdEntries) result));
+ }
+ return Futures.immediateCheckedFuture(Optional.absent());
+ }).when(mockReadTx).read(LogicalDatastoreType.CONFIGURATION, idUtils.getIdEntry(parentPoolIdentifier, idKey));
+
+ doReturn(Futures.immediateCheckedFuture(Optional.of(globalIdPool))).when(mockReadTx).read(
+ LogicalDatastoreType.CONFIGURATION, parentPoolIdentifier);
+ }
+
private InstanceIdentifier<ReleasedIdsHolder> buildReleaseIdsIdentifier(
String poolName) {
InstanceIdentifier<ReleasedIdsHolder> releasedIds = InstanceIdentifier
List<DelayedIdEntries> delayedIdEntriesList = new ArrayList<>();
for (long idValue : idValues) {
DelayedIdEntries delayedIdEntries = new DelayedIdEntriesBuilder().setId(idValue)
- .setReadyTimeSec(System.currentTimeMillis() / 1000).build();
+ .setReadyTimeSec(0L).build();
delayedIdEntriesList.add(delayedIdEntries);
}
return delayedIdEntriesList;
return childPoolsList;
}
- private void requestIdsConcurrently(CountDownLatch latch, int numberOfTasks, Set<Long> idSet) {
+ private void requestIdsConcurrently(CountDownLatch latch, int numberOfTasks, Set<Long> idSet, boolean isSameKey) {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < numberOfTasks; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
- Future<RpcResult<AllocateIdOutput>> result = idManager.allocateId(buildAllocateId(poolName,
- Thread.currentThread().getName()));
- Long idValue = null;
+ Future<RpcResult<AllocateIdOutput>> result;
+ if (!isSameKey) {
+ result = idManager.allocateId(buildAllocateId(poolName,
+ Thread.currentThread().getName()));
+ } else {
+ result = idManager.allocateId(buildAllocateId(poolName,
+ idKey));
+ }
try {
if (result.get().isSuccessful()) {
- idValue = result.get().getResult().getIdValue();
- assertTrue(idValue <= idStart + blockSize - 1);
- assertTrue(idSet.add(idValue));
+ Long idValue = result.get().getResult().getIdValue();
+ assertTrue(idValue <= idStart + blockSize);
+ if (isSameKey) {
+ idSet.add(idValue);
+ } else {
+ assertTrue(idSet.add(idValue));
+ }
} else {
RpcError error = result.get().getErrors().iterator().next();
assertTrue(error.getCause().getMessage().contains("Ids exhausted for pool : " + poolName));
});
}
}
+
+ private void waitUntilJobIsDone() throws InterruptedException {
+ TimeUnit.SECONDS.sleep(1);
+ }
}