import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
public class BaseEndpointServiceImpl implements BaseEndpointService, AutoCloseable {
private final DataBroker dataProvider;
private final EndpointAugmentorRegistryImpl epAugRegistry;
- private static final Function<Void, RpcResult<Void>> TO_SUCCESS_RPC_RESULT = new Function<Void, RpcResult<Void>>() {
-
- @Override
- public RpcResult<Void> apply(Void input) {
- return RpcResultBuilder.<Void>success().build();
- }
- };
+ private static final Function<Void, RpcResult<Void>> TO_SUCCESS_RPC_RESULT =
+ input -> RpcResultBuilder.<Void>success().build();
public BaseEndpointServiceImpl(DataBroker dataProvider, EndpointAugmentorRegistryImpl epAugRegistry) {
this.epAugRegistry = Preconditions.checkNotNull(epAugRegistry);
addAddressEndpointToParents(t, endpoint);
}
- return Futures.transform(t.submit(), TO_SUCCESS_RPC_RESULT);
+ return Futures.transform(t.submit(), TO_SUCCESS_RPC_RESULT, MoreExecutors.directExecutor());
}
private void addContainmentEndpointToChilds(ReadWriteTransaction t, ContainmentEndpoint endpoint) {
}
ListenableFuture<Void> r = t.submit();
- return Futures.transform(r, TO_SUCCESS_RPC_RESULT);
+ return Futures.transform(r, TO_SUCCESS_RPC_RESULT, MoreExecutors.directExecutor());
}
private void deleteAddressEndpointFromParents(ReadWriteTransaction t, AddressEndpoint endpoint) {
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
/**
* Endpoint registry provides a scalable store for accessing and updating
*/
public class EndpointRpcRegistry implements EndpointService, EpRendererAugmentationRegistry, AutoCloseable {
- private static final Logger LOG = LoggerFactory.getLogger(EndpointRpcRegistry.class);
-
- private final DataBroker dataProvider;
-
final static ConcurrentMap<String, EpRendererAugmentation> registeredRenderers =
new ConcurrentHashMap<String, EpRendererAugmentation>();
+ private static final Logger LOG = LoggerFactory.getLogger(EndpointRpcRegistry.class);
+ private final DataBroker dataProvider;
+ private Function<Void, RpcResult<Void>> futureTrans = input -> RpcResultBuilder.<Void>success().build();
/**
* This method registers a renderer for endpoint RPC API. This method
public void onSuccess(Void result) {
}
- });
+ }, MoreExecutors.directExecutor());
}
// TODO Be alagalah - age out endpoint data and remove
}
}
ListenableFuture<Void> r = t.submit();
- return Futures.transform(r, futureTrans);
+ return Futures.transform(r, futureTrans, MoreExecutors.directExecutor());
}
@Override
t.put(LogicalDatastoreType.OPERATIONAL, iid_l3prefix, epL3Prefix);
ListenableFuture<Void> r = t.submit();
- return Futures.transform(r, futureTrans);
+ return Futures.transform(r, futureTrans, MoreExecutors.directExecutor());
}
@Override
}
ListenableFuture<Void> r = t.submit();
- return Futures.transform(r, futureTrans);
+ return Futures.transform(r, futureTrans, MoreExecutors.directExecutor());
}
@Override
}
ListenableFuture<Void> r = t.submit();
- return Futures.transform(r, futureTrans);
+ return Futures.transform(r, futureTrans, MoreExecutors.directExecutor());
}
@Override
}
ListenableFuture<Void> r = t.submit();
- return Futures.transform(r, futureTrans);
+ return Futures.transform(r, futureTrans, MoreExecutors.directExecutor());
}
-
- Function<Void, RpcResult<Void>> futureTrans = new Function<Void, RpcResult<Void>>() {
-
- @Override
- public RpcResult<Void> apply(Void input) {
- return RpcResultBuilder.<Void>success().build();
- }
- };
}
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.base_endpoint.rev160427.parent.child.endpoints.parent.endpoint.choice.parent.containment.endpoint._case.ParentContainmentEndpoint;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.base_endpoint.rev160427.parent.child.endpoints.parent.endpoint.choice.parent.endpoint._case.ParentEndpoint;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.EndpointGroupId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.tenants.tenant.policy.ExternalImplicitGroup;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.renderer.rev151103.renderers.renderer.renderer.policy.configuration.endpoints.AddressEndpointWithLocation;
-import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
public class EndpointUtils {
return false;
}
for (EndpointGroupId epgId : addrEp.getEndpointGroup()) {
- results.add(Futures.transform(
- rTx.read(LogicalDatastoreType.CONFIGURATION,
- IidFactory.externalImplicitGroupIid(addrEp.getTenant(), epgId)),
- new Function<Optional<ExternalImplicitGroup>, Boolean>() {
-
- @Override
- public Boolean apply(Optional<ExternalImplicitGroup> input) {
- return input.isPresent();
- }
- }));
+ results.add(Futures.transform( rTx.read(LogicalDatastoreType.CONFIGURATION,
+ IidFactory.externalImplicitGroupIid(addrEp.getTenant(), epgId)), Optional::isPresent,
+ MoreExecutors.directExecutor()));
}
try {
List<Boolean> list = Futures.allAsList(results).get();
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
LOG.error("Failed to write bridge {}. Message: {}" + bridge.getNodeId().getValue(),
t.getMessage());
}
- });
+ }, MoreExecutors.directExecutor());
}
}
OvsdbBridgeAugmentation ovsdbBridge = node.getAugmentation(OvsdbBridgeAugmentation.class);
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
+
import java.util.List;
import java.util.Optional;
+
+import javax.annotation.Nonnull;
+
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
Futures.addCallback(future, new FutureCallback<Void>() {
@Override
- public void onFailure(Throwable throwable) {
+ public void onFailure(@Nonnull Throwable throwable) {
LOG.error("Could not register renderer {}: {}", renderer, throwable);
}
public void onSuccess(Void result) {
LOG.debug("Renderer {} successfully registered.", renderer);
}
- });
+ }, MoreExecutors.directExecutor());
}
}
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
public class NodeManager {
public void onFailure(@Nullable Throwable throwable) {
LOG.warn("Exception thrown when removing node... {}", throwable);
}
- });
+ }, MoreExecutors.directExecutor());
}
/**
public void onFailure(@Nullable Throwable throwable) {
LOG.warn("Exception thrown when resolving node... {}", throwable);
}
- });
+ }, MoreExecutors.directExecutor());
}
/**
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
}
return syncEndpoints(dataAfter, Create);
}
- });
+ }, MoreExecutors.directExecutor());
}
return Futures.transformAsync(creationResult, new AsyncFunction<Optional<Status>, Boolean>() {
@Override
public Boolean apply(@Nullable final Void input) {
return Boolean.TRUE;
}
- });
+ }, MoreExecutors.directExecutor());
}
- });
+ }, MoreExecutors.directExecutor());
}
/**
return Optional.of(status);
}
- });
+ }, MoreExecutors.directExecutor());
}
private CheckedFuture<Void, TransactionCommitFailedException> reportPolicy(final long version,
import static com.google.common.base.Preconditions.checkNotNull;
+import javax.annotation.Nonnull;
+
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
public class ActionDefinitionListener extends DataTreeChangeHandler<ActionDefinition> {
.child(Renderer.class, new RendererKey(IovisorRenderer.RENDERER_NAME))
.child(Capabilities.class)
.build();
- private static String PUT = "stored";
- private static String DELETED = "removed";
+ private static final String PUT = "stored";
+ private static final String DELETED = "removed";
public ActionDefinitionListener(DataBroker dataBroker) {
super(dataBroker);
}
@Override
- public void onFailure(Throwable t) {
+ public void onFailure(@Nonnull Throwable t) {
LOG.error("Capability of renderer {} was NOT {}: {}", IovisorRenderer.RENDERER_NAME.getValue(),
putOrDeleted, supportedActionDefinitionKey.getActionDefinitionId().getValue(), t);
}
WriteTransaction wTx = dataProvider.newWriteOnlyTransaction();
wTx.delete(LogicalDatastoreType.OPERATIONAL,
CAPABILITIES_IID.child(SupportedActionDefinition.class, supportedActionDefinitionKey));
- Futures.addCallback(wTx.submit(), logDebugResult(supportedActionDefinitionKey, DELETED));
+ Futures.addCallback(wTx.submit(), logDebugResult(supportedActionDefinitionKey, DELETED), MoreExecutors.directExecutor());
}
}
wTx.put(LogicalDatastoreType.OPERATIONAL,
CAPABILITIES_IID.child(SupportedActionDefinition.class, supportedActionDefinition.getKey()),
supportedActionDefinition, true);
- Futures.addCallback(wTx.submit(), logDebugResult(supportedActionDefinition.getKey(), PUT));
+ Futures.addCallback(wTx.submit(), logDebugResult(supportedActionDefinition.getKey(), PUT), MoreExecutors.directExecutor());
}
}
}
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
public class ClassifierDefinitionListener extends DataTreeChangeHandler<ClassifierDefinition> {
wTx.delete(LogicalDatastoreType.OPERATIONAL,
CAPABILITIES_IID.child(SupportedClassifierDefinition.class, supportedClassifierDefinitionKey));
- Futures.addCallback(wTx.submit(), logDebugResult(supportedClassifierDefinitionKey, DELETED));
+ Futures.addCallback(wTx.submit(), logDebugResult(supportedClassifierDefinitionKey, DELETED), MoreExecutors.directExecutor());
}
}
wTx.put(LogicalDatastoreType.OPERATIONAL,
CAPABILITIES_IID.child(SupportedClassifierDefinition.class, supportedClassifierDefinition.getKey()),
supportedClassifierDefinition, true);
- Futures.addCallback(wTx.submit(), logDebugResult(supportedClassifierDefinition.getKey(), PUT));
+ Futures.addCallback(wTx.submit(), logDebugResult(supportedClassifierDefinition.getKey(), PUT), MoreExecutors.directExecutor());
}
}
}
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
+
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
public void onSuccess(Void result) {
LOG.debug("Flow table {} updated.", tableIid);
}
- });
+ }, MoreExecutors.directExecutor());
}
private void updateGroups(DataBroker dataBroker, final NodeId nodeId)
public void onSuccess(Void result) {
LOG.debug("Group table on node {} updated.", nodeId);
}
- });
+ }, MoreExecutors.directExecutor());
}
}
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
/**
* Manage policies on switches by subscribing to updates from the
scheduleUpdate();
return null;
}
- });
+ }, MoreExecutors.directExecutor());
}
/**
public ListenableFuture<Void> apply(List<Void> readyToSubmit) {
return rwTx.submit();
}
- });
+ }, MoreExecutors.directExecutor());
}
private List<Short> getTableIDs() {
}
return null;
}
- });
+ }, MoreExecutors.directExecutor());
}
// **************
import java.util.List;
import java.util.concurrent.Future;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.ImmutablePair;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
public class ArpTasker implements PacketProcessingListener {
if (!addFlowResult.isSuccessful()) {
LOG.warn("An ARP Reply to Controller flow was not created on node {} \nErrors: {}",
node.getId().getValue(), addFlowResult.getErrors());
- continue;
}
}
LOG.debug("ARP Reply to Controller flows were created on node {}", node.getId().getValue());
senderIpAddress);
ListenableFuture<RpcResult<Void>> futureSendArpResult = arpSender.sendArp(senderAddress, tpa,
extNcIidAndMac.getLeft());
- Futures.addCallback(futureSendArpResult, logResult(tpa, extNcIidAndMac.getLeft()));
+ Futures.addCallback(futureSendArpResult, logResult(tpa, extNcIidAndMac.getLeft()), MoreExecutors.directExecutor());
}
}
@Override
- public void onFailure(Throwable t) {
+ public void onFailure(@Nonnull Throwable t) {
LOG.error(
"Illegal state - Installation of ARP flows on node {} failed. Node can contain just some ARP flows.",
node.getId(), t);
}
- });
+ }, MoreExecutors.directExecutor());
}
}
import java.util.Collection;
+import javax.annotation.Nonnull;
+
import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
public class ActionDefinitionListener implements ClusteredDataTreeChangeListener<ActionDefinition>, AutoCloseable {
.child(Renderer.class, new RendererKey(OFOverlayRenderer.RENDERER_NAME))
.child(Capabilities.class)
.build();
- private static String PUT = "stored";
- private static String DELETED = "removed";
+ private static final String PUT = "stored";
+ private static final String DELETED = "removed";
private final DataBroker dataProvider;
private final ListenerRegistration<ActionDefinitionListener> registration;
}
@Override
- public void onDataTreeChanged(Collection<DataTreeModification<ActionDefinition>> changes) {
+ public void onDataTreeChanged(@Nonnull Collection<DataTreeModification<ActionDefinition>> changes) {
for (DataTreeModification<ActionDefinition> change : changes) {
DataObjectModification<ActionDefinition> rootNode = change.getRootNode();
wTx.put(LogicalDatastoreType.OPERATIONAL, CAPABILITIES_IID
.child(SupportedActionDefinition.class, supportedActionDefinition.getKey()),
supportedActionDefinition, true);
- Futures.addCallback(wTx.submit(), logDebugResult(supportedActionDefinition.getKey(), PUT));
+ Futures.addCallback(wTx.submit(), logDebugResult(supportedActionDefinition.getKey(), PUT), MoreExecutors
+ .directExecutor());
}
break;
WriteTransaction wTx = dataProvider.newWriteOnlyTransaction();
wTx.delete(LogicalDatastoreType.OPERATIONAL,
CAPABILITIES_IID.child(SupportedActionDefinition.class, supportedActionDefinitionKey));
- Futures.addCallback(wTx.submit(), logDebugResult(supportedActionDefinitionKey, DELETED));
+ Futures.addCallback(wTx.submit(), logDebugResult(supportedActionDefinitionKey, DELETED), MoreExecutors.directExecutor());
}
break;
}
}
@Override
- public void onFailure(Throwable t) {
+ public void onFailure(@Nonnull Throwable t) {
LOG.error("Capability of renderer {} was NOT {}: {}", OFOverlayRenderer.RENDERER_NAME.getValue(),
putOrDeleted, supportedActionDefinitionKey, t);
}
import java.util.Collection;
+import javax.annotation.Nonnull;
+
import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
public class ClassifierDefinitionListener implements ClusteredDataTreeChangeListener<ClassifierDefinition>, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(ClassifierDefinitionListener.class);
- private static final InstanceIdentifier<Capabilities> RENDERER_CAPABILITIES_IID = InstanceIdentifier.builder(Renderers.class)
+ private static final InstanceIdentifier<Capabilities> RENDERER_CAPABILITIES_IID = InstanceIdentifier
+ .builder(Renderers.class)
.child(Renderer.class, new RendererKey(OFOverlayRenderer.RENDERER_NAME))
.child(Capabilities.class)
.build();
- private static String PUT = "stored";
- private static String DELETED = "removed";
+ private static final String PUT = "stored";
+ private static final String DELETED = "removed";
private final DataBroker dataProvider;
private final ListenerRegistration<ClassifierDefinitionListener> registration;
}
@Override
- public void onDataTreeChanged(Collection<DataTreeModification<ClassifierDefinition>> changes) {
+ public void onDataTreeChanged(@Nonnull Collection<DataTreeModification<ClassifierDefinition>> changes) {
for (DataTreeModification<ClassifierDefinition> change : changes) {
DataObjectModification<ClassifierDefinition> rootNode = change.getRootNode();
wTx.put(LogicalDatastoreType.OPERATIONAL, RENDERER_CAPABILITIES_IID
.child(SupportedClassifierDefinition.class, supportedClassifierDefinition.getKey()),
supportedClassifierDefinition, true);
- Futures.addCallback(wTx.submit(), logDebugResult(supportedClassifierDefinition.getKey(), PUT));
+ Futures.addCallback(wTx.submit(), logDebugResult(supportedClassifierDefinition.getKey(), PUT),
+ MoreExecutors.directExecutor());
}
break;
WriteTransaction wTx = dataProvider.newWriteOnlyTransaction();
wTx.delete(LogicalDatastoreType.OPERATIONAL, RENDERER_CAPABILITIES_IID
.child(SupportedClassifierDefinition.class, supportedClassifierDefinitionKey));
- Futures.addCallback(wTx.submit(), logDebugResult(supportedClassifierDefinitionKey, DELETED));
+ Futures.addCallback(wTx.submit(), logDebugResult(supportedClassifierDefinitionKey, DELETED),
+ MoreExecutors.directExecutor());
}
break;
}
-
}
}
}
@Override
- public void onFailure(Throwable t) {
+ public void onFailure(@Nonnull Throwable t) {
LOG.error("Capability of renderer {} was NOT {}: {}", OFOverlayRenderer.RENDERER_NAME.getValue(),
putOrDeleted, supportedClassifierDefinitionKey, t);
}
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import com.google.common.util.concurrent.MoreExecutors;
+
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.MountPointService;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
public void onSuccess(Void result) {
LOG.debug("Renderer {} successfully registered.", renderer);
}
- });
+ }, MoreExecutors.directExecutor());
}
private void unregisterFromRendererManager() {
public void onSuccess(Void result) {
LOG.debug("Renderer {} successfully unregistered.", VppRenderer.NAME);
}
- });
+ }, MoreExecutors.directExecutor());
}
public MountedDataBrokerProvider getMountedDataBroker() {
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.vpp_renderer.rev160425._interface.attributes.InterfaceTypeChoice;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.vpp_renderer.rev160425._interface.attributes._interface.type.choice.TapCase;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.vpp_renderer.rev160425._interface.attributes._interface.type.choice.VhostUserCase;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.vpp_renderer.rev160425.bridge.domain.base.attributes.PhysicalLocationRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.l2.types.rev130827.VlanId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.v3po.rev170607.VhostUserRole;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.v3po.rev170607.VxlanVni;
futures.add(bridgeDomainManager.createVlanBridgeDomainOnVppNode(input.getId(), vlanId, nodeId));
});
}
- return Futures.transformAsync(Futures.allAsList(futures), voidsToRpcResult());
+ return Futures.transformAsync(Futures.allAsList(futures), voidsToRpcResult(), MoreExecutors.directExecutor());
}
public Future<RpcResult<Void>> deleteVirtualBridgeDomainFromNodes(DeleteVirtualBridgeDomainFromNodesInput input) {
input.getBridgeDomainNode().forEach(nodeId -> {
futures.add(bridgeDomainManager.removeBridgeDomainFromVppNode(input.getBridgeDomainId(), nodeId));
});
- return Futures.transformAsync(Futures.allAsList(futures), voidsToRpcResult());
+ return Futures.transformAsync(Futures.allAsList(futures), voidsToRpcResult(), MoreExecutors.directExecutor());
}
public ListenableFuture<RpcResult<Void>> cloneVirtualBridgeDomainOnNodes(CloneVirtualBridgeDomainOnNodesInput input) {
input.getBridgeDomainId(), vlanId, nodeId));
});
}
- return Futures.transformAsync(Futures.allAsList(futures), voidsToRpcResult());
+ return Futures.transformAsync(Futures.allAsList(futures), voidsToRpcResult(), MoreExecutors.directExecutor());
}
- });
+ }, MoreExecutors.directExecutor());
}
public ListenableFuture<RpcResult<Void>> createInterfaceOnNode(CreateInterfaceOnNodeInput input) {
import com.google.common.collect.HashMultimap;
import com.google.common.collect.SetMultimap;
import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
if(!oldVppEndpoint.equals(newVppEndpoint)) {
LOG.debug("Updating vpp endpoint, old EP: {} new EP: {}", oldVppEndpoint, newVppEndpoint);
return Futures.transformAsync(vppEndpointDeleted(oldVppEndpoint),
- (AsyncFunction<Void, Void>) input -> vppEndpointCreated(newVppEndpoint));
+ input -> vppEndpointCreated(newVppEndpoint), MoreExecutors.directExecutor());
}
LOG.debug("Update skipped, provided before/after vpp endpoints are equal");
return Futures.immediateFuture(null);
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
public class VppNodeManager {
public void onFailure(@Nonnull Throwable t) {
LOG.warn("Node synchronization failed. Data before: {} after {}", dataBefore, dataAfter);
}
- });
+ }, MoreExecutors.directExecutor());
}
private boolean isControllerConfigNode(final Node dataAfter, final Node dataBefore) {
java.util.Optional<PhysicalInterface> pubInt = rn.getAugmentation(VppInterfaceAugmentation.class)
.getPhysicalInterface()
.stream()
- .filter(phInt -> phInt.isExternal())
+ .filter(PhysicalInterface::isExternal)
.findFirst();
if (pubInt.isPresent()) {
nodes.put(rn.getNodePath().firstKeyOf(Node.class).getNodeId(), pubInt.get().getInterfaceName());
import java.util.Collection;
import java.util.Collections;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
public class BridgeDomainManagerImpl implements BridgeDomainManager {
CheckedFuture<Optional<GbpBridgeDomain>, ReadFailedException> futureTopology =
rTx.read(LogicalDatastoreType.CONFIGURATION, bridgeDomainConfigIid);
rTx.close();
- return Futures.transformAsync(futureTopology, new AsyncFunction<Optional<GbpBridgeDomain>, Void>() {
-
- @Override
- public ListenableFuture<Void> apply(@Nonnull Optional<GbpBridgeDomain> optBridgeDomainConf) throws Exception {
- if (optBridgeDomainConf.isPresent() && optBridgeDomainConf.get().getPhysicalLocationRef() != null) {
- for (PhysicalLocationRef ref : optBridgeDomainConf.get().getPhysicalLocationRef()) {
- if (!ref.getNodeId().equals(vppNodeId)) {
- LOG.debug("Node {} is not referenced node, skipping", ref.getNodeId());
- continue;
- }
- if (ref.getInterface() != null && ref.getInterface().size() > 0) {
- NodeVbridgeVlanAugment vppNodeVlanAug = new NodeVbridgeVlanAugmentBuilder()
- .setSuperInterface(ref.getInterface().get(0)).build();
- Node vppNode = createBasicVppNodeBuilder(vppNodeId)
- .addAugmentation(NodeVbridgeVlanAugment.class, vppNodeVlanAug).build();
- return createBridgeDomainOnVppNode(bridgeDomainName, topologyAug, vppNode);
- }
+ return Futures.transformAsync(futureTopology, optBridgeDomainConf -> {
+ if (optBridgeDomainConf != null && optBridgeDomainConf.isPresent()
+ && optBridgeDomainConf.get().getPhysicalLocationRef() != null) {
+ for (PhysicalLocationRef ref : optBridgeDomainConf.get().getPhysicalLocationRef()) {
+ if (!ref.getNodeId().equals(vppNodeId)) {
+ LOG.debug("Node {} is not referenced node, skipping", ref.getNodeId());
+ continue;
+ }
+ if (ref.getInterface() != null && ref.getInterface().size() > 0) {
+ NodeVbridgeVlanAugment vppNodeVlanAug = new NodeVbridgeVlanAugmentBuilder()
+ .setSuperInterface(ref.getInterface().get(0)).build();
+ Node vppNode = createBasicVppNodeBuilder(vppNodeId)
+ .addAugmentation(NodeVbridgeVlanAugment.class, vppNodeVlanAug).build();
+ return createBridgeDomainOnVppNode(bridgeDomainName, topologyAug, vppNode);
}
}
- return Futures.immediateFailedFuture(
- new Throwable("Failed to apply config for VLAN bridge domain " + bridgeDomainName));
}
- });
+ return Futures.immediateFailedFuture(
+ new Throwable("Failed to apply config for VLAN bridge domain " + bridgeDomainName));
+ }, MoreExecutors.directExecutor());
}
/**
final CheckedFuture<Optional<Topology>, ReadFailedException> optTopology =
rTx.read(LogicalDatastoreType.CONFIGURATION, topologyIid);
rTx.close();
- return Futures.transformAsync(optTopology, new AsyncFunction<Optional<Topology>, Void>() {
- @Override
- public ListenableFuture<Void> apply(@Nonnull final Optional<Topology> optTopology)
- throws InterruptedException, ExecutionException {
- // Topology
- final SettableFuture<Void> topologyFuture = SettableFuture.create();
- if (!optTopology.isPresent()) {
- final WriteTransaction wTx = dataProvider.newWriteOnlyTransaction();
- final Topology topology = new TopologyBuilder().setKey(topologyKey)
- .setTopologyTypes(VBRIDGE_TOPOLOGY_TYPE)
- .addAugmentation(TopologyVbridgeAugment.class, vBridgeAug)
- .build();
- wTx.put(LogicalDatastoreType.CONFIGURATION, topologyIid, topology, true);
- Futures.addCallback(wTx.submit(), new FutureCallback<Void>() {
+ return Futures.transformAsync(optTopology, topologyOptional -> {
+ // Topology
+ Preconditions.checkNotNull(topologyOptional,
+ "TopologyOptional with topologyIiD: " + topologyIid + " must not be null when creating BD");
+ final SettableFuture<Void> topologyFuture = SettableFuture.create();
+ if (!topologyOptional.isPresent()) {
+ final WriteTransaction wTx = dataProvider.newWriteOnlyTransaction();
+ final Topology topology = new TopologyBuilder().setKey(topologyKey)
+ .setTopologyTypes(VBRIDGE_TOPOLOGY_TYPE)
+ .addAugmentation(TopologyVbridgeAugment.class, vBridgeAug)
+ .build();
+ wTx.put(LogicalDatastoreType.CONFIGURATION, topologyIid, topology, true);
+ Futures.addCallback(wTx.submit(), new FutureCallback<Void>() {
- @Override
- public void onSuccess(@Nullable final Void result) {
- final InstanceIdentifier<BridgeDomain> bridgeDomainStateIid =
- VppIidFactory.getBridgeDomainStateIid(new BridgeDomainKey(bridgeDomainName));
- LOG.debug("Adding a listener on bridge domain state", bridgeDomainName);
- final DataTreeIdentifier<BridgeDomain> bridgeDomainStateIidDTI = new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL,
- bridgeDomainStateIid);
- new ListenableFutureSetter<>(dataProvider, topologyFuture, bridgeDomainStateIidDTI, ModificationType.WRITE);
- }
+ @Override
+ public void onSuccess(@Nullable final Void result) {
+ final InstanceIdentifier<BridgeDomain> bridgeDomainStateIid =
+ VppIidFactory.getBridgeDomainStateIid(new BridgeDomainKey(bridgeDomainName));
+ LOG.debug("Adding a listener on bridge domain state", bridgeDomainName);
+ final DataTreeIdentifier<BridgeDomain> bridgeDomainStateIidDTI = new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL,
+ bridgeDomainStateIid);
+ new ListenableFutureSetter<>(dataProvider, topologyFuture, bridgeDomainStateIidDTI, ModificationType.WRITE);
+ }
- @Override
- public void onFailure(@Nonnull Throwable t) {
- LOG.warn("Request create topology for VBD was not stored to CONF DS. {}", topologyIid, t);
- topologyFuture.setException(new Exception("Cannot send request to VBD."));
- }
- });
- } else {
- topologyFuture.set(null);
- LOG.info("Bridge domain {} already exists", optTopology.get().getTopologyId());
- }
- return Futures.transformAsync(topologyFuture, new AsyncFunction<Void, Void>() {
@Override
- public ListenableFuture<Void> apply(@Nonnull Void topologyInput) throws Exception {
- // Bridge member
- final SettableFuture<Void> futureBridgeMember = SettableFuture.create();
- final InstanceIdentifier<Node> nodeIid = VppIidFactory.getNodeIid(topologyKey, vppNode.getKey());
- LOG.debug("Adding node {} to bridge domain {}", vppNode.getKey(), topologyKey.getTopologyId());
- final WriteTransaction wTx = dataProvider.newWriteOnlyTransaction();
- wTx.put(LogicalDatastoreType.CONFIGURATION, nodeIid, vppNode);
- Futures.addCallback(wTx.submit(), new FutureCallback<Void>() {
+ public void onFailure(@Nonnull Throwable t) {
+ LOG.warn("Request create topology for VBD was not stored to CONF DS. {}", topologyIid, t);
+ topologyFuture.setException(new Exception("Cannot send request to VBD."));
+ }
+ }, MoreExecutors.directExecutor());
+ } else {
+ topologyFuture.set(null);
+ LOG.info("Bridge domain {} already exists", topologyOptional.get().getTopologyId());
+ }
+ return Futures.transformAsync(topologyFuture, topologyInput -> {
+ // Bridge member
+ final SettableFuture<Void> futureBridgeMember = SettableFuture.create();
+ final InstanceIdentifier<Node> nodeIid = VppIidFactory.getNodeIid(topologyKey, vppNode.getKey());
+ LOG.debug("Adding node {} to bridge domain {}", vppNode.getKey(), topologyKey.getTopologyId());
+ final WriteTransaction wTx = dataProvider.newWriteOnlyTransaction();
+ wTx.put(LogicalDatastoreType.CONFIGURATION, nodeIid, vppNode);
+ Futures.addCallback(wTx.submit(), new FutureCallback<Void>() {
- @Override
- public void onSuccess(@Nullable final Void _void) {
- final DataTreeIdentifier<BridgeMember> bridgeMemberIid =
- new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL,
- nodeIid.augmentation(NodeVbridgeAugment.class).child(BridgeMember.class));
- LOG.debug("Request create node in topology for VBD was stored to CONF DS. {}", nodeIid);
- new ListenableFutureSetter<>(dataProvider, futureBridgeMember, bridgeMemberIid,
- ModificationType.WRITE);
- }
+ @Override
+ public void onSuccess(@Nullable final Void _void) {
+ final DataTreeIdentifier<BridgeMember> bridgeMemberIid =
+ new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL,
+ nodeIid.augmentation(NodeVbridgeAugment.class).child(BridgeMember.class));
+ LOG.debug("Request create node in topology for VBD was stored to CONF DS. {}", nodeIid);
+ new ListenableFutureSetter<>(dataProvider, futureBridgeMember, bridgeMemberIid,
+ ModificationType.WRITE);
+ }
- @Override
- public void onFailure(@Nonnull final Throwable t) {
- LOG.warn("Request create node in topology for VBD was not stored to CONF DS. {}", nodeIid, t);
- futureBridgeMember.setException(new Exception("Cannot send request to VBD."));
- }
- });
- return futureBridgeMember;
+ @Override
+ public void onFailure(@Nonnull final Throwable t) {
+ LOG.warn("Request create node in topology for VBD was not stored to CONF DS. {}", nodeIid, t);
+ futureBridgeMember.setException(new Exception("Cannot send request to VBD."));
}
- });
- }
- });
+ }, MoreExecutors.directExecutor());
+ return futureBridgeMember;
+ }, MoreExecutors.directExecutor());
+ }, MoreExecutors.directExecutor());
}
@Override
LOG.warn("Request delete node in topology for VBD was not stored to CONF DS. {}", nodeIid, t);
future.setException(new Exception("Cannot send request to VBD."));
}
- });
+ }, MoreExecutors.directExecutor());
return future;
}
}
import com.google.common.collect.Sets.SetView;
import com.google.common.eventbus.Subscribe;
-
public class VppRendererPolicyManager {
private static final Logger LOG = LoggerFactory.getLogger(VppRendererPolicyManager.class);
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.util.HashMap;
import java.util.Map;
}
- });
+ }, MoreExecutors.directExecutor());
return future;
}
}
- });
+ }, MoreExecutors.directExecutor());
return future;
}
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+
import java.util.Collection;
import java.util.HashSet;
import javax.annotation.Nullable;
public ListenableFuture<Collection<Sgt>> findSgtForEP(final AddressEndpointWithLocation endpointWithLocation) {
final EpPolicyTemplateValueKey rawKey = new EpPolicyTemplateValueKey(endpointWithLocation);
final EpPolicyTemplateValueKey key = keyFactory.sortValueKeyLists(rawKey);
- return Futures.transform(epPolicyTemplateReader.readBy(key), TRANSFORM_TEMPLATE_TO_SGT);
+ return Futures.transform(epPolicyTemplateReader.readBy(key), TRANSFORM_TEMPLATE_TO_SGT,
+ MoreExecutors.directExecutor());
}
}
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+
import java.util.Collections;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
final ReadOnlyTransaction rTx = dataBroker.newReadOnlyTransaction();
final CheckedFuture<Optional<AddressEndpoint>, ReadFailedException> read = rTx.read(
LogicalDatastoreType.OPERATIONAL, addressEndpointPath);
- Futures.addCallback(read, SxpListenerUtil.createTxCloseCallback(rTx));
+ Futures.addCallback(read, SxpListenerUtil.createTxCloseCallback(rTx), MoreExecutors.directExecutor());
return read;
}
}
return nextResult;
}
- });
+ }, MoreExecutors.directExecutor());
}
private boolean isSameEpg(RegisterEndpointInput epInput, AddressEndpoint input) {
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+
import java.util.List;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
final CheckedFuture<Optional<SxpEpMapper>, ReadFailedException> read =
rTx.read(LogicalDatastoreType.CONFIGURATION, buildReadPath(key));
- Futures.addCallback(read, SxpListenerUtil.createTxCloseCallback(rTx));
+ Futures.addCallback(read, SxpListenerUtil.createTxCloseCallback(rTx), MoreExecutors.directExecutor());
readResult = Futures.transform(read, new Function<Optional<SxpEpMapper>, Optional<EndpointForwardingTemplateBySubnet>>() {
@Nullable
@Override
public Optional<EndpointForwardingTemplateBySubnet> apply(@Nullable final Optional<SxpEpMapper> input) {
- if (input.isPresent()) {
+ if (input!=null && input.isPresent()) {
// clean cache
cachedDao.invalidateCache();
return Optional.absent();
}
}
- });
+ }, MoreExecutors.directExecutor());
}
return readResult;
}
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+
import java.util.Collection;
import java.util.Collections;
import javax.annotation.Nonnull;
.orElse(Futures.immediateFuture(Optional.absent()))
);
}
- });
+ }, MoreExecutors.directExecutor());
}
private <T> ListenableFuture<Optional<T>> rewrapOptionalToGuavaOptional(final ListenableFuture<java.util.Optional<T>> templateFu) {
.map(origNonnullInput -> Optional.fromNullable(origNonnullInput.orElse(null)))
.orElse(Optional.absent());
}
- }
- );
+ },
+ MoreExecutors.directExecutor());
}
.child(EndpointPolicyTemplateBySgt.class, new EndpointPolicyTemplateBySgtKey(sgt));
wTx.put(LogicalDatastoreType.CONFIGURATION, epPolicyTemplatePath, template, true);
- return Futures.transform(wTx.submit(), createStoreOutcomeHandlerToCollection(template));
+ return Futures.transform(wTx.submit(), createStoreOutcomeHandlerToCollection(template),
+ MoreExecutors.directExecutor());
}
private EndpointPolicyTemplateBySgt buildEpPolicyTemplate(final EpPolicyTemplateValueKey templateLookupKey, final Sgt sgt) {
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+
import java.util.ArrayList;
import java.util.Collection;
import javax.annotation.Nonnull;
final CheckedFuture<Optional<EndpointPolicyTemplateBySgt>, ReadFailedException> read =
rTx.read(LogicalDatastoreType.CONFIGURATION, buildReadPath(key));
- Futures.addCallback(read, SxpListenerUtil.createTxCloseCallback(rTx));
+ Futures.addCallback(read, SxpListenerUtil.createTxCloseCallback(rTx), MoreExecutors.directExecutor());
return Futures.transform(read, new Function<Optional<EndpointPolicyTemplateBySgt>, Optional<EndpointPolicyTemplateBySgt>>() {
@Nullable
}
return input;
}
- });
+ }, MoreExecutors.directExecutor());
}
}
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
ReadableAsyncByKey<Sgt, MasterDatabaseBinding> {
private static final Logger LOG = LoggerFactory.getLogger(MasterDatabaseBindingDaoImpl.class);
- private static final ListenableFuture<Optional<MasterDatabaseBinding>> READ_FUTURE_ABSENT = Futures.immediateFuture(Optional.absent());
+ private static final ListenableFuture<Optional<MasterDatabaseBinding>>
+ READ_FUTURE_ABSENT = Futures.immediateFuture(Optional.absent());
private final DataBroker dataBroker;
private final SimpleCachedDao<IpPrefix, MasterDatabaseBinding> cachedDao;
public Optional<MasterDatabaseBinding> apply(@Nullable final Void input) {
return lookup(cachedDao, key);
}
- });
+ }, MoreExecutors.directExecutor());
}
}
final CheckedFuture<Optional<Topology>, ReadFailedException> read =
rTx.read(LogicalDatastoreType.CONFIGURATION, buildReadPath(null));
- Futures.addCallback(read, SxpListenerUtil.createTxCloseCallback(rTx));
+ Futures.addCallback(read, SxpListenerUtil.createTxCloseCallback(rTx), MoreExecutors.directExecutor());
return Futures.transform(read, new Function<Optional<Topology>, Void>() {
@Nullable
}
return null;
}
- });
+ }, MoreExecutors.directExecutor());
}
private InstanceIdentifier<Topology> buildReadPath(final Sgt key) {
}
return foundGroups;
}
- });
+ }, MoreExecutors.directExecutor());
}
}
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+
import java.util.Collection;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.tuple.Pair;
// find all available epForwardingTemplates and pair those to sxpMasterDBBinding
final ListenableFuture<Optional<Pair<MasterDatabaseBinding, EndpointPolicyTemplateBySgt>>> searchResult =
- Futures.transformAsync(sxpMasterDbItemRead, createReadAndPairTemplateToBindingFunction(epForwardingTemplate));
+ Futures.transformAsync(sxpMasterDbItemRead, createReadAndPairTemplateToBindingFunction(epForwardingTemplate), MoreExecutors
+ .directExecutor());
// invoke sxpMapperReactor.process for every valid combination of sxpMasterDBBinding, epPolicyTemplate, epForwardingTemplate
final ListenableFuture<RpcResult<Void>> rpcResult =
- Futures.transformAsync(searchResult, createProcessAllFunction(epForwardingTemplate));
+ Futures.transformAsync(searchResult, createProcessAllFunction(epForwardingTemplate), MoreExecutors.directExecutor());
- Futures.addCallback(rpcResult, ANY_RPC_FUTURE_CALLBACK);
+ Futures.addCallback(rpcResult, ANY_RPC_FUTURE_CALLBACK, MoreExecutors.directExecutor());
}
private AsyncFunction<Optional<Pair<MasterDatabaseBinding, EndpointPolicyTemplateBySgt>>, RpcResult<Void>>
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
masterDBBindingDao.readBy(epPolicyTemplate.getSgt());
// find all available epForwardingTemplates and pair those to sxpMasterDBBinding
- final ListenableFuture<List<Pair<MasterDatabaseBinding, EndpointForwardingTemplateBySubnet>>> epForwardingTemplatesRead =
- Futures.transformAsync(sxpMasterDbItemsRead, createReadAndPairTemplateToBindingFunction(epPolicyTemplate));
+ final ListenableFuture<List<Pair<MasterDatabaseBinding, EndpointForwardingTemplateBySubnet>>>
+ epForwardingTemplatesRead = Futures.transformAsync(sxpMasterDbItemsRead,
+ createReadAndPairTemplateToBindingFunction(epPolicyTemplate), MoreExecutors.directExecutor());
// invoke sxpMapperReactor.process for every valid combination of sxpMasterDBBinding, epPolicyTemplate, epForwardingTemplate
final ListenableFuture<List<RpcResult<Void>>> rpcResult =
- Futures.transformAsync(epForwardingTemplatesRead, createProcessAllFunction(epPolicyTemplate));
+ Futures.transformAsync(epForwardingTemplatesRead, createProcessAllFunction(epPolicyTemplate),
+ MoreExecutors.directExecutor());
- Futures.addCallback(rpcResult, RPC_RESULT_FUTURE_CALLBACK);
+ Futures.addCallback(rpcResult, RPC_RESULT_FUTURE_CALLBACK, MoreExecutors.directExecutor());
}
private AsyncFunction<List<Pair<MasterDatabaseBinding, EndpointForwardingTemplateBySubnet>>, List<RpcResult<Void>>>
createProcessAllFunction(final EndpointPolicyTemplateBySgt epPolicyTemplate) {
- return new AsyncFunction<List<Pair<MasterDatabaseBinding, EndpointForwardingTemplateBySubnet>>, List<RpcResult<Void>>>() {
- @Override
- public ListenableFuture<List<RpcResult<Void>>>
- apply(final List<Pair<MasterDatabaseBinding, EndpointForwardingTemplateBySubnet>> input) throws Exception {
- final ListenableFuture<List<RpcResult<Void>>> result;
- if (input == null || input.isEmpty()) {
- LOG.debug("no pair [epForwardingTemplate, ip-sgt-binding] available for sgt: {}", epPolicyTemplate.getSgt());
- result = Futures.immediateFuture(Collections.singletonList(
- RpcResultBuilder.<Void>failed()
- .withError(RpcError.ErrorType.APPLICATION,
- "no pair [epForwardingTemplate, ip-sgt-binding] available for sgt " + epPolicyTemplate.getSgt())
- .build()));
- } else {
- LOG.trace("processing epPolicyTemplate event for sgt: {}", epPolicyTemplate.getSgt());
- List<ListenableFuture<RpcResult<Void>>> allResults = new ArrayList<>(input.size());
- for (Pair<MasterDatabaseBinding, EndpointForwardingTemplateBySubnet> pair : input) {
- final MasterDatabaseBinding sxpMasterDBBinding = pair.getLeft();
- final EndpointForwardingTemplateBySubnet epForwardingTemplate = pair.getRight();
- if (epForwardingTemplate != null) {
- LOG.trace("processing epPolicyTemplate event with resolved sxpMasterDb entry and " +
- "epForwardingTemplate for sgt/ip-prefix: {}/{}",
- sxpMasterDBBinding.getSecurityGroupTag(), sxpMasterDBBinding.getImplementedInterface());
- allResults.add(sxpMapperReactor.processTemplatesAndSxpMasterDB(
- epPolicyTemplate, epForwardingTemplate, sxpMasterDBBinding));
- }
+ return input -> {
+ final ListenableFuture<List<RpcResult<Void>>> result;
+ if (input == null || input.isEmpty()) {
+ LOG.debug("no pair [epForwardingTemplate, ip-sgt-binding] available for sgt: {}",
+ epPolicyTemplate.getSgt());
+ result =
+ Futures.immediateFuture(Collections.singletonList(
+ RpcResultBuilder.<Void>failed().withError(RpcError.ErrorType.APPLICATION,
+ "no pair [epForwardingTemplate, ip-sgt-binding] available for sgt "
+ + epPolicyTemplate.getSgt()).build()));
+ } else {
+ LOG.trace("processing epPolicyTemplate event for sgt: {}", epPolicyTemplate.getSgt());
+ List<ListenableFuture<RpcResult<Void>>> allResults = new ArrayList<>(input.size());
+ for (Pair<MasterDatabaseBinding, EndpointForwardingTemplateBySubnet> pair : input) {
+ final MasterDatabaseBinding sxpMasterDBBinding = pair.getLeft();
+ final EndpointForwardingTemplateBySubnet epForwardingTemplate = pair.getRight();
+ if (epForwardingTemplate != null) {
+ LOG.trace("processing epPolicyTemplate event with resolved sxpMasterDb entry and "
+ + "epForwardingTemplate for sgt/ip-prefix: {}/{}", sxpMasterDBBinding.getSecurityGroupTag(),
+ sxpMasterDBBinding.getImplementedInterface());
+ allResults.add(
+ sxpMapperReactor.processTemplatesAndSxpMasterDB(epPolicyTemplate, epForwardingTemplate,
+ sxpMasterDBBinding));
}
- result = Futures.successfulAsList(allResults);
}
-
- return result;
+ result = Futures.successfulAsList(allResults);
}
+
+ return result;
};
}
- private AsyncFunction<Collection<MasterDatabaseBinding>, List<Pair<MasterDatabaseBinding, EndpointForwardingTemplateBySubnet>>>
- createReadAndPairTemplateToBindingFunction(final EndpointPolicyTemplateBySgt epPolicyTemplate) {
- return new AsyncFunction<Collection<MasterDatabaseBinding>, List<Pair<MasterDatabaseBinding, EndpointForwardingTemplateBySubnet>>>() {
- @Override
- public ListenableFuture<List<Pair<MasterDatabaseBinding, EndpointForwardingTemplateBySubnet>>>
- apply(final Collection<MasterDatabaseBinding> input) throws Exception {
- final ListenableFuture<List<Pair<MasterDatabaseBinding, EndpointForwardingTemplateBySubnet>>> result;
- if (input == null || input.isEmpty()) {
- LOG.debug("no sxpMasterDB entry available for sgt: {}", epPolicyTemplate.getSgt());
- result = Futures.immediateFuture(Collections.emptyList());
- } else {
- LOG.trace("processing sxpMasterDB entries for sgt: {}", epPolicyTemplate.getSgt());
- List<ListenableFuture<Pair<MasterDatabaseBinding, EndpointForwardingTemplateBySubnet>>> allResults =
- new ArrayList<>(input.size());
- for (MasterDatabaseBinding masterDBItem : input) {
- final ListenableFuture<Optional<EndpointForwardingTemplateBySubnet>> epForwardingTemplateRead =
- epForwardingTemplateDao.read(masterDBItem.getIpPrefix());
- allResults.add(EPTemplateUtil.wrapToPair(masterDBItem, epForwardingTemplateRead));
- }
- result = Futures.successfulAsList(allResults);
+ private AsyncFunction<Collection<MasterDatabaseBinding>, List<Pair<MasterDatabaseBinding, EndpointForwardingTemplateBySubnet>>> createReadAndPairTemplateToBindingFunction(
+ final EndpointPolicyTemplateBySgt epPolicyTemplate) {
+ return input -> {
+ final ListenableFuture<List<Pair<MasterDatabaseBinding, EndpointForwardingTemplateBySubnet>>> result;
+ if (input == null || input.isEmpty()) {
+ LOG.debug("no sxpMasterDB entry available for sgt: {}", epPolicyTemplate.getSgt());
+ result = Futures.immediateFuture(Collections.emptyList());
+ } else {
+ LOG.trace("processing sxpMasterDB entries for sgt: {}", epPolicyTemplate.getSgt());
+ List<ListenableFuture<Pair<MasterDatabaseBinding, EndpointForwardingTemplateBySubnet>>>
+ allResults =
+ new ArrayList<>(input.size());
+ for (MasterDatabaseBinding masterDBItem : input) {
+ final ListenableFuture<Optional<EndpointForwardingTemplateBySubnet>>
+ epForwardingTemplateRead =
+ epForwardingTemplateDao.read(masterDBItem.getIpPrefix());
+ allResults.add(EPTemplateUtil.wrapToPair(masterDBItem, epForwardingTemplateRead));
}
-
- return result;
+ result = Futures.successfulAsList(allResults);
}
+
+ return result;
};
}
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+
import java.util.Collection;
import javax.annotation.Nonnull;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
}
return result;
}
- });
+ }, MoreExecutors.directExecutor());
- Futures.addCallback(rpcResult, RPC_POLICY_RESULT_FUTURE_CALLBACK);
+ Futures.addCallback(rpcResult, RPC_POLICY_RESULT_FUTURE_CALLBACK, MoreExecutors.directExecutor());
}
@Override
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nonnull;
compositeResult.setLeft(input);
return compositeResult;
}
- }));
+ }, MoreExecutors.directExecutor()));
results.add(Futures.transform(rightRead, new Function<Optional<R>, OptionalMutablePair<L, R>>() {
@Nullable
compositeResult.setRight(input);
return compositeResult;
}
- }));
+ }, MoreExecutors.directExecutor()));
return Futures.transform(Futures.successfulAsList(results),
new Function<List<?>, OptionalMutablePair<L, R>>() {
public OptionalMutablePair<L, R> apply(@Nullable final List<?> input) {
return compositeResult;
}
- });
+ }, MoreExecutors.directExecutor());
}
public static <K, V> ListenableFuture<Pair<K, V>> wrapToPair(
public Pair<K, V> apply(@Nullable final Optional<V> input) {
return new MutablePair<>(keyItem, input.orNull());
}
- });
+ }, MoreExecutors.directExecutor());
}
public static <V> ListenableFuture<Optional<V>> wrapToOptional(final ListenableFuture<V> value) {
public Optional<V> apply(@Nullable final V input) {
return Optional.fromNullable(input);
}
- });
+ }, MoreExecutors.directExecutor());
}
public static Ordering<EndpointGroupId> createEndpointGroupIdOrdering() {
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
return Optional.ofNullable(input)
.flatMap(i -> i.map(sgtName -> buildTemplate(sgt, iseContext.getIseSourceConfig().getTenant(), sgtName)));
}
- });
+ }, MoreExecutors.directExecutor());
}
private EndpointPolicyTemplateBySgt buildTemplate(final @Nonnull Sgt sgt, final @Nonnull TenantId tenantId,
public void onFailure(final Throwable t) {
LOG.debug("[epPolicyTemplateProvider] harvestAll FAILED: {}", t.getMessage());
}
- });
+ }, MoreExecutors.directExecutor());
return Futures.transform(sgtUpdateFu, new Function<Collection<SgtInfo>, Optional<String>>() {
@Nullable
.filter(sgtInfo -> sgt.equals(sgtInfo.getSgt())).findFirst()
.map(SgtInfo::getName));
}
- });
+ }, MoreExecutors.directExecutor());
}
private Optional<IseContext> findIseSourceConfigBySgt(final Sgt sgt) {
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.text.SimpleDateFormat;
import java.util.Collection;
LOG.debug("ise harvest failed", t);
storeOutcome(false, 0, t.getMessage());
}
- });
+ }, MoreExecutors.directExecutor());
try {
harvestResult.get(30, TimeUnit.SECONDS);
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.WebResource;
LOG.debug("entering stg-info processor {}", processor.getClass().getSimpleName());
return processor.processSgtInfo(iseSourceConfig.getTenant(), sgtInfos);
}
- });
+ }, MoreExecutors.directExecutor());
}
result = Futures.transform(processingResult, new Function<Void, Collection<SgtInfo>>() {
@Nullable
// always success, otherwise there will be TransactionCommitFailedException thrown
return sgtInfos;
}
- });
+ }, MoreExecutors.directExecutor());
} catch (Exception e) {
LOG.debug("failed to harvest ise", e);
result = Futures.immediateFailedFuture(e);