import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
masterDBBindingDao.readBy(epPolicyTemplate.getSgt());
// find all available epForwardingTemplates and pair those to sxpMasterDBBinding
- final ListenableFuture<List<Pair<MasterDatabaseBinding, EndpointForwardingTemplateBySubnet>>> epForwardingTemplatesRead =
- Futures.transformAsync(sxpMasterDbItemsRead, createReadAndPairTemplateToBindingFunction(epPolicyTemplate));
+ final ListenableFuture<List<Pair<MasterDatabaseBinding, EndpointForwardingTemplateBySubnet>>>
+ epForwardingTemplatesRead = Futures.transformAsync(sxpMasterDbItemsRead,
+ createReadAndPairTemplateToBindingFunction(epPolicyTemplate), MoreExecutors.directExecutor());
// invoke sxpMapperReactor.process for every valid combination of sxpMasterDBBinding, epPolicyTemplate, epForwardingTemplate
final ListenableFuture<List<RpcResult<Void>>> rpcResult =
- Futures.transformAsync(epForwardingTemplatesRead, createProcessAllFunction(epPolicyTemplate));
+ Futures.transformAsync(epForwardingTemplatesRead, createProcessAllFunction(epPolicyTemplate),
+ MoreExecutors.directExecutor());
- Futures.addCallback(rpcResult, RPC_RESULT_FUTURE_CALLBACK);
+ Futures.addCallback(rpcResult, RPC_RESULT_FUTURE_CALLBACK, MoreExecutors.directExecutor());
}
private AsyncFunction<List<Pair<MasterDatabaseBinding, EndpointForwardingTemplateBySubnet>>, List<RpcResult<Void>>>
createProcessAllFunction(final EndpointPolicyTemplateBySgt epPolicyTemplate) {
- return new AsyncFunction<List<Pair<MasterDatabaseBinding, EndpointForwardingTemplateBySubnet>>, List<RpcResult<Void>>>() {
- @Override
- public ListenableFuture<List<RpcResult<Void>>>
- apply(final List<Pair<MasterDatabaseBinding, EndpointForwardingTemplateBySubnet>> input) throws Exception {
- final ListenableFuture<List<RpcResult<Void>>> result;
- if (input == null || input.isEmpty()) {
- LOG.debug("no pair [epForwardingTemplate, ip-sgt-binding] available for sgt: {}", epPolicyTemplate.getSgt());
- result = Futures.immediateFuture(Collections.singletonList(
- RpcResultBuilder.<Void>failed()
- .withError(RpcError.ErrorType.APPLICATION,
- "no pair [epForwardingTemplate, ip-sgt-binding] available for sgt " + epPolicyTemplate.getSgt())
- .build()));
- } else {
- LOG.trace("processing epPolicyTemplate event for sgt: {}", epPolicyTemplate.getSgt());
- List<ListenableFuture<RpcResult<Void>>> allResults = new ArrayList<>(input.size());
- for (Pair<MasterDatabaseBinding, EndpointForwardingTemplateBySubnet> pair : input) {
- final MasterDatabaseBinding sxpMasterDBBinding = pair.getLeft();
- final EndpointForwardingTemplateBySubnet epForwardingTemplate = pair.getRight();
- if (epForwardingTemplate != null) {
- LOG.trace("processing epPolicyTemplate event with resolved sxpMasterDb entry and " +
- "epForwardingTemplate for sgt/ip-prefix: {}/{}",
- sxpMasterDBBinding.getSecurityGroupTag(), sxpMasterDBBinding.getImplementedInterface());
- allResults.add(sxpMapperReactor.processTemplatesAndSxpMasterDB(
- epPolicyTemplate, epForwardingTemplate, sxpMasterDBBinding));
- }
+ return input -> {
+ final ListenableFuture<List<RpcResult<Void>>> result;
+ if (input == null || input.isEmpty()) {
+ LOG.debug("no pair [epForwardingTemplate, ip-sgt-binding] available for sgt: {}",
+ epPolicyTemplate.getSgt());
+ result =
+ Futures.immediateFuture(Collections.singletonList(
+ RpcResultBuilder.<Void>failed().withError(RpcError.ErrorType.APPLICATION,
+ "no pair [epForwardingTemplate, ip-sgt-binding] available for sgt "
+ + epPolicyTemplate.getSgt()).build()));
+ } else {
+ LOG.trace("processing epPolicyTemplate event for sgt: {}", epPolicyTemplate.getSgt());
+ List<ListenableFuture<RpcResult<Void>>> allResults = new ArrayList<>(input.size());
+ for (Pair<MasterDatabaseBinding, EndpointForwardingTemplateBySubnet> pair : input) {
+ final MasterDatabaseBinding sxpMasterDBBinding = pair.getLeft();
+ final EndpointForwardingTemplateBySubnet epForwardingTemplate = pair.getRight();
+ if (epForwardingTemplate != null) {
+ LOG.trace("processing epPolicyTemplate event with resolved sxpMasterDb entry and "
+ + "epForwardingTemplate for sgt/ip-prefix: {}/{}", sxpMasterDBBinding.getSecurityGroupTag(),
+ sxpMasterDBBinding.getImplementedInterface());
+ allResults.add(
+ sxpMapperReactor.processTemplatesAndSxpMasterDB(epPolicyTemplate, epForwardingTemplate,
+ sxpMasterDBBinding));
}
- result = Futures.successfulAsList(allResults);
}
-
- return result;
+ result = Futures.successfulAsList(allResults);
}
+
+ return result;
};
}
- private AsyncFunction<Collection<MasterDatabaseBinding>, List<Pair<MasterDatabaseBinding, EndpointForwardingTemplateBySubnet>>>
- createReadAndPairTemplateToBindingFunction(final EndpointPolicyTemplateBySgt epPolicyTemplate) {
- return new AsyncFunction<Collection<MasterDatabaseBinding>, List<Pair<MasterDatabaseBinding, EndpointForwardingTemplateBySubnet>>>() {
- @Override
- public ListenableFuture<List<Pair<MasterDatabaseBinding, EndpointForwardingTemplateBySubnet>>>
- apply(final Collection<MasterDatabaseBinding> input) throws Exception {
- final ListenableFuture<List<Pair<MasterDatabaseBinding, EndpointForwardingTemplateBySubnet>>> result;
- if (input == null || input.isEmpty()) {
- LOG.debug("no sxpMasterDB entry available for sgt: {}", epPolicyTemplate.getSgt());
- result = Futures.immediateFuture(Collections.emptyList());
- } else {
- LOG.trace("processing sxpMasterDB entries for sgt: {}", epPolicyTemplate.getSgt());
- List<ListenableFuture<Pair<MasterDatabaseBinding, EndpointForwardingTemplateBySubnet>>> allResults =
- new ArrayList<>(input.size());
- for (MasterDatabaseBinding masterDBItem : input) {
- final ListenableFuture<Optional<EndpointForwardingTemplateBySubnet>> epForwardingTemplateRead =
- epForwardingTemplateDao.read(masterDBItem.getIpPrefix());
- allResults.add(EPTemplateUtil.wrapToPair(masterDBItem, epForwardingTemplateRead));
- }
- result = Futures.successfulAsList(allResults);
+ private AsyncFunction<Collection<MasterDatabaseBinding>, List<Pair<MasterDatabaseBinding, EndpointForwardingTemplateBySubnet>>> createReadAndPairTemplateToBindingFunction(
+ final EndpointPolicyTemplateBySgt epPolicyTemplate) {
+ return input -> {
+ final ListenableFuture<List<Pair<MasterDatabaseBinding, EndpointForwardingTemplateBySubnet>>> result;
+ if (input == null || input.isEmpty()) {
+ LOG.debug("no sxpMasterDB entry available for sgt: {}", epPolicyTemplate.getSgt());
+ result = Futures.immediateFuture(Collections.emptyList());
+ } else {
+ LOG.trace("processing sxpMasterDB entries for sgt: {}", epPolicyTemplate.getSgt());
+ List<ListenableFuture<Pair<MasterDatabaseBinding, EndpointForwardingTemplateBySubnet>>>
+ allResults =
+ new ArrayList<>(input.size());
+ for (MasterDatabaseBinding masterDBItem : input) {
+ final ListenableFuture<Optional<EndpointForwardingTemplateBySubnet>>
+ epForwardingTemplateRead =
+ epForwardingTemplateDao.read(masterDBItem.getIpPrefix());
+ allResults.add(EPTemplateUtil.wrapToPair(masterDBItem, epForwardingTemplateRead));
}
-
- return result;
+ result = Futures.successfulAsList(allResults);
}
+
+ return result;
};
}