private static final int DEFAULT_BATCH_SIZE = 1000;
private static final int DEFAULT_BATCH_INTERVAL = 500;
- private final BlockingQueue<ActionableResource> topologyConfigShardBufferQ = new LinkedBlockingQueue<>();
- private final BlockingQueue<ActionableResource> defaultConfigShardBufferQ = new LinkedBlockingQueue<>();
- private final BlockingQueue<ActionableResource> defaultOperationalShardBufferQ = new LinkedBlockingQueue<>();
+ private final BlockingQueue<ActionableResource<?>> topologyConfigShardBufferQ = new LinkedBlockingQueue<>();
+ private final BlockingQueue<ActionableResource<?>> defaultConfigShardBufferQ = new LinkedBlockingQueue<>();
+ private final BlockingQueue<ActionableResource<?>> defaultOperationalShardBufferQ = new LinkedBlockingQueue<>();
private final DataBroker dataBroker;
private final ResourceBatchingManager resourceBatchingManager = ResourceBatchingManager.getInstance();
getQueue(entityType).add(ActionableResources.create(path, data));
}
- public BlockingQueue<ActionableResource> getQueue(EntityType entityType) {
+ public BlockingQueue<ActionableResource<?>> getQueue(EntityType entityType) {
switch (entityType) {
case DEFAULT_CONFIG:
return defaultConfigShardBufferQ;
private static final Logger LOG = LoggerFactory.getLogger(ITMBatchingUtils.class);
- private static final BlockingQueue<ActionableResource> DEFAULT_OPERATIONAL_SHARD_BUFFER_Q
- = new LinkedBlockingQueue<>();
- private static final BlockingQueue<ActionableResource> DEFAULT_CONFIG_SHARD_BUFFER_Q = new LinkedBlockingQueue<>();
- private static final BlockingQueue<ActionableResource> TOPOLOGY_CONFIG_SHARD_BUFFER_Q = new LinkedBlockingQueue<>();
+ private static final BlockingQueue<ActionableResource<?>> DEFAULT_OPERATIONAL_SHARD_BUFFER_Q =
+ new LinkedBlockingQueue<>();
+ private static final BlockingQueue<ActionableResource<?>> DEFAULT_CONFIG_SHARD_BUFFER_Q =
+ new LinkedBlockingQueue<>();
+ private static final BlockingQueue<ActionableResource<?>> TOPOLOGY_CONFIG_SHARD_BUFFER_Q =
+ new LinkedBlockingQueue<>();
private static DataBroker dataBroker;
}
@NonNull
- public static BlockingQueue<ActionableResource> getQueue(EntityType entityType) {
+ public static BlockingQueue<ActionableResource<?>> getQueue(EntityType entityType) {
switch (entityType) {
case DEFAULT_OPERATIONAL:
return DEFAULT_OPERATIONAL_SHARD_BUFFER_Q;
*/
package org.opendaylight.genius.utils.batching;
+import static java.util.Objects.requireNonNull;
+
import com.google.common.util.concurrent.ListenableFuture;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-public interface ActionableResource {
+public abstract class ActionableResource<T extends DataObject> {
- short CREATE = 1;
- short UPDATE = 2;
- short DELETE = 3;
- short READ = 4;
+ static final short CREATE = 1;
+ static final short UPDATE = 2;
+ static final short DELETE = 3;
+ static final short READ = 4;
// MDSAL-534 Merge,Put with no create_missing_parents flag
- short UPDATECONTAINER = 5;
+ static final short UPDATECONTAINER = 5;
+
+ private final InstanceIdentifier<T> path;
+ private final short action;
+
+ // Hidden to prevent subclassing outside of this package
+ ActionableResource(final InstanceIdentifier<T> path, final short action) {
+ this.path = requireNonNull(path);
+ this.action = action;
+ }
- InstanceIdentifier<?> getInstanceIdentifier();
+ final short getAction() {
+ return action;
+ }
- Object getInstance();
+ final @NonNull InstanceIdentifier<T> getInstanceIdentifier() {
+ return path;
+ }
- Object getOldInstance();
+ abstract Object getInstance();
- short getAction();
+ abstract Object getOldInstance();
- ListenableFuture<Void> getResultFuture();
+ abstract ListenableFuture<Void> getResultFuture();
}
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-class ActionableResourceImpl implements ActionableResource {
+class ActionableResourceImpl<T extends DataObject> extends ActionableResource<T> {
+ private final SettableFuture future = SettableFuture.create();
private final Object instance;
private final Object oldInstance;
private final Object key;
- private final InstanceIdentifier identifier;
- private final short action;
- private final SettableFuture future = SettableFuture.create();
- ActionableResourceImpl(InstanceIdentifier identifier, short action, Object updatedData, Object oldData) {
+ ActionableResourceImpl(InstanceIdentifier<T> path, short action, Object updatedData, Object oldData) {
+ super(path, action);
this.key = null;
- this.action = action;
- this.identifier = requireNonNull(identifier);
this.instance = updatedData;
this.oldInstance = oldData;
}
- ActionableResourceImpl(Identifier key, InstanceIdentifier identifier, short action, Object updatedData,
+ ActionableResourceImpl(Identifier key, InstanceIdentifier<T> path, short action, Object updatedData,
Object oldData) {
+ super(path, action);
this.key = requireNonNull(key);
- this.action = action;
- this.identifier = requireNonNull(identifier);
this.instance = updatedData;
this.oldInstance = oldData;
}
@Override
- public Object getInstance() {
+ final Object getInstance() {
return this.instance;
}
@Override
- public Object getOldInstance() {
+ final Object getOldInstance() {
return this.oldInstance;
}
@Override
- public InstanceIdentifier getInstanceIdentifier() {
- return this.identifier;
- }
-
- @Override
- public short getAction() {
- return action;
- }
-
- @Override
- public ListenableFuture<Void> getResultFuture() {
+ final ListenableFuture<Void> getResultFuture() {
return future;
}
@Override
- public String toString() {
- return key != null ? key.toString() : identifier.toString();
+ public final String toString() {
+ return key != null ? key.toString() : getInstanceIdentifier().toString();
}
}
}
- public static <T extends DataObject> @NonNull ActionableResource create(final InstanceIdentifier<T> path,
+ public static <T extends DataObject> @NonNull ActionableResource<T> create(final InstanceIdentifier<T> path,
final T data) {
- return new ActionableResourceImpl(path, ActionableResource.CREATE, requireNonNull(data), null);
+ return new ActionableResourceImpl<>(path, ActionableResource.CREATE, requireNonNull(data), null);
}
- public static <T extends DataObject> @NonNull ActionableResource create(final Identifier identifier,
+ public static <T extends DataObject> @NonNull ActionableResource<T> create(final Identifier identifier,
final InstanceIdentifier<T> path, final T data) {
- return new ActionableResourceImpl(identifier, path, ActionableResource.CREATE, requireNonNull(data), null);
+ return new ActionableResourceImpl<>(identifier, path, ActionableResource.CREATE, requireNonNull(data), null);
}
- public static <T extends DataObject> @NonNull ActionableResource update(final InstanceIdentifier<T> path,
+ public static <T extends DataObject> @NonNull ActionableResource<T> update(final InstanceIdentifier<T> path,
final T newData) {
- return new ActionableResourceImpl(path, ActionableResource.UPDATE, requireNonNull(newData), null);
+ return new ActionableResourceImpl<>(path, ActionableResource.UPDATE, requireNonNull(newData), null);
}
- public static <T extends DataObject> @NonNull ActionableResource update(final Identifier identifier,
+ public static <T extends DataObject> @NonNull ActionableResource<T> update(final Identifier identifier,
final InstanceIdentifier<T> path, final T newData, final T oldData) {
- return new ActionableResourceImpl(identifier, path, ActionableResource.UPDATE, requireNonNull(newData),
+ return new ActionableResourceImpl<>(identifier, path, ActionableResource.UPDATE, requireNonNull(newData),
oldData);
}
- public static @NonNull ActionableResource delete(final InstanceIdentifier<?> path) {
- return new ActionableResourceImpl(path, ActionableResource.DELETE, null, null);
+ public static @NonNull ActionableResource<?> delete(final InstanceIdentifier<?> path) {
+ return new ActionableResourceImpl<>(path, ActionableResource.DELETE, null, null);
}
- public static <T extends DataObject> @NonNull ActionableResource delete(final Identifier identifier,
+ public static <T extends DataObject> @NonNull ActionableResource<T> delete(final Identifier identifier,
final InstanceIdentifier<T> path, final T data) {
- return new ActionableResourceImpl(identifier, path, ActionableResource.DELETE, data, null);
+ return new ActionableResourceImpl<>(identifier, path, ActionableResource.DELETE, data, null);
}
- public static <T extends DataObject> @NonNull ActionableResource updateContainer(final
+ public static <T extends DataObject> @NonNull ActionableResource<T> updateContainer(final
InstanceIdentifier<T> path, final T newData) {
- return new ActionableResourceImpl(path, ActionableResource.UPDATECONTAINER, requireNonNull(newData),
+ return new ActionableResourceImpl<>(path, ActionableResource.UPDATECONTAINER, requireNonNull(newData),
null);
}
}
CONFIG_INVENTORY(LogicalDatastoreType.CONFIGURATION),
OPERATIONAL_INVENTORY(LogicalDatastoreType.OPERATIONAL);
- BlockingQueue<ActionableResource> queue = new LinkedBlockingQueue<>();
+ BlockingQueue<ActionableResource<?>> queue = new LinkedBlockingQueue<>();
LogicalDatastoreType datastoreType;
ShardResource(LogicalDatastoreType datastoreType) {
return datastoreType;
}
- BlockingQueue<ActionableResource> getQueue() {
+ BlockingQueue<ActionableResource<?>> getQueue() {
return queue;
}
}
- private final ConcurrentHashMap<String, Pair<BlockingQueue<ActionableResource>, ResourceHandler>>
+ private final ConcurrentHashMap<String, Pair<BlockingQueue<ActionableResource<?>>, ResourceHandler>>
resourceHandlerMapper = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, ScheduledExecutorService>
resourceBatchingThreadMapper.values().forEach(ScheduledExecutorService::shutdown);
}
- public void registerBatchableResource(
- String resourceType, final BlockingQueue<ActionableResource> resQueue, final ResourceHandler resHandler) {
+ public void registerBatchableResource(final String resourceType,
+ final BlockingQueue<ActionableResource<?>> resQueue, final ResourceHandler resHandler) {
Preconditions.checkNotNull(resQueue, "ResourceQueue to use for batching cannot not be null.");
Preconditions.checkNotNull(resHandler, "ResourceHandler cannot not be null.");
*/
public <T extends DataObject> FluentFuture<Optional<T>> read(
String resourceType, InstanceIdentifier<T> identifier) throws InterruptedException, ExecutionException {
- BlockingQueue<ActionableResource> queue = getQueue(resourceType);
+ BlockingQueue<ActionableResource<?>> queue = getQueue(resourceType);
if (queue != null) {
if (pendingModificationByResourceType.get(resourceType).contains(identifier)) {
SettableFuture<Optional<T>> readFuture = SettableFuture.create();
public ListenableFuture<Void> merge(ShardResource shardResource, InstanceIdentifier<?> identifier,
DataObject updatedData) {
- BlockingQueue<ActionableResource> queue = shardResource.getQueue();
+ BlockingQueue<ActionableResource<?>> queue = shardResource.getQueue();
if (queue != null) {
beforeModification(shardResource.name(), identifier);
- ActionableResource actResource = new ActionableResourceImpl(
+ ActionableResource<?> actResource = new ActionableResourceImpl<>(
identifier, ActionableResource.UPDATE, updatedData, null/*oldData*/);
queue.add(actResource);
return actResource.getResultFuture();
}
public void merge(String resourceType, InstanceIdentifier<?> identifier, DataObject updatedData) {
- BlockingQueue<ActionableResource> queue = getQueue(resourceType);
+ BlockingQueue<ActionableResource<?>> queue = getQueue(resourceType);
if (queue != null) {
beforeModification(resourceType, identifier);
- ActionableResource actResource = new ActionableResourceImpl(
+ ActionableResource<?> actResource = new ActionableResourceImpl<>(
identifier, ActionableResource.UPDATE, updatedData, null/*oldData*/);
queue.add(actResource);
}
}
public ListenableFuture<Void> delete(ShardResource shardResource, InstanceIdentifier<?> identifier) {
- BlockingQueue<ActionableResource> queue = shardResource.getQueue();
+ BlockingQueue<ActionableResource<?>> queue = shardResource.getQueue();
if (queue != null) {
beforeModification(shardResource.name(), identifier);
- ActionableResource actResource = new ActionableResourceImpl(
+ ActionableResource actResource = new ActionableResourceImpl<>(
identifier, ActionableResource.DELETE, null, null/*oldData*/);
queue.add(actResource);
return actResource.getResultFuture();
}
public void delete(String resourceType, InstanceIdentifier<?> identifier) {
- BlockingQueue<ActionableResource> queue = getQueue(resourceType);
+ BlockingQueue<ActionableResource<?>> queue = getQueue(resourceType);
if (queue != null) {
beforeModification(resourceType, identifier);
- ActionableResource actResource = new ActionableResourceImpl(
+ ActionableResource<?> actResource = new ActionableResourceImpl<>(
identifier, ActionableResource.DELETE, null, null/*oldData*/);
queue.add(actResource);
}
public ListenableFuture<Void> put(ShardResource shardResource, InstanceIdentifier<?> identifier,
DataObject updatedData) {
- BlockingQueue<ActionableResource> queue = shardResource.getQueue();
+ BlockingQueue<ActionableResource<?>> queue = shardResource.getQueue();
if (queue != null) {
beforeModification(shardResource.name(), identifier);
- ActionableResource actResource = new ActionableResourceImpl(
+ ActionableResource<?> actResource = new ActionableResourceImpl<>(
identifier, ActionableResource.CREATE, updatedData, null/*oldData*/);
queue.add(actResource);
return actResource.getResultFuture();
}
public void put(String resourceType, InstanceIdentifier<?> identifier, DataObject updatedData) {
- BlockingQueue<ActionableResource> queue = getQueue(resourceType);
+ BlockingQueue<ActionableResource<?>> queue = getQueue(resourceType);
if (queue != null) {
beforeModification(resourceType, identifier);
- ActionableResource actResource = new ActionableResourceImpl(
+ ActionableResource<?> actResource = new ActionableResourceImpl<>(
identifier, ActionableResource.CREATE, updatedData, null/*oldData*/);
queue.add(actResource);
}
}
- private BlockingQueue<ActionableResource> getQueue(String resourceType) {
+ private BlockingQueue<ActionableResource<?>> getQueue(String resourceType) {
if (resourceHandlerMapper.containsKey(resourceType)) {
return resourceHandlerMapper.get(resourceType).getLeft();
}
@Override
public void run() {
- List<ActionableResource> resList = new ArrayList<>();
+ List<ActionableResource<?>> resList = new ArrayList<>();
try {
- Pair<BlockingQueue<ActionableResource>, ResourceHandler> resMapper =
+ Pair<BlockingQueue<ActionableResource<?>>, ResourceHandler> resMapper =
resourceHandlerMapper.get(resourceType);
if (resMapper == null) {
LOG.error("Unable to find resourceMapper for batching the ResourceType {}", resourceType);
return;
}
- BlockingQueue<ActionableResource> resQueue = resMapper.getLeft();
+ BlockingQueue<ActionableResource<?>> resQueue = resMapper.getLeft();
ResourceHandler resHandler = resMapper.getRight();
resList.add(resQueue.take());
resQueue.drainTo(resList);
private class MdsalDsTask<T extends DataObject> {
String resourceType;
- List<ActionableResource> actResourceList;
+ List<ActionableResource<?>> actResourceList;
- MdsalDsTask(String resourceType, List<ActionableResource> actResourceList) {
+ MdsalDsTask(String resourceType, List<ActionableResource<?>> actResourceList) {
this.resourceType = resourceType;
this.actResourceList = actResourceList;
}
@SuppressWarnings("unchecked")
public void process() {
LOG.trace("Picked up 3 size {} of resourceType {}", actResourceList.size(), resourceType);
- Pair<BlockingQueue<ActionableResource>, ResourceHandler> resMapper =
+ Pair<BlockingQueue<ActionableResource<?>>, ResourceHandler> resMapper =
resourceHandlerMapper.get(resourceType);
if (resMapper == null) {
LOG.error("Unable to find resourceMapper for batching the ResourceType {}", resourceType);
ReadWriteTransaction tx = broker.newReadWriteTransaction();
List<SubTransaction> transactionObjects = new ArrayList<>();
Map<SubTransaction, SettableFuture<Void>> txMap = new HashMap<>();
- for (ActionableResource actResource : actResourceList) {
+ for (ActionableResource<?> actResource : actResourceList) {
int startSize = transactionObjects.size();
switch (actResource.getAction()) {
case ActionableResource.CREATE:
}
}
- private static class ActionableReadResource<T extends DataObject> extends ActionableResourceImpl {
+ private static class ActionableReadResource<T extends DataObject> extends ActionableResourceImpl<T> {
private final SettableFuture<Optional<T>> readFuture;
ActionableReadResource(InstanceIdentifier<T> identifier, SettableFuture<Optional<T>> readFuture) {
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
class FlowBatchingUtils {
- private final BlockingQueue<ActionableResource> inventoryConfigShardBufferQ = new LinkedBlockingQueue<>();
+ private final BlockingQueue<ActionableResource<?>> inventoryConfigShardBufferQ = new LinkedBlockingQueue<>();
public void registerWithBatchManager(ResourceHandler resourceHandler) {
ResourceBatchingManager resBatchingManager = ResourceBatchingManager.getInstance();