import static java.util.Objects.requireNonNull;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
import java.io.InputStream;
-import java.util.List;
+import java.util.Optional;
import javax.ws.rs.Consumes;
import javax.ws.rs.Encoded;
import javax.ws.rs.POST;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
-import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.mdsal.dom.api.DOMMountPoint;
import org.opendaylight.mdsal.dom.api.DOMMountPointService;
-import org.opendaylight.mdsal.dom.api.DOMRpcException;
-import org.opendaylight.mdsal.dom.api.DOMRpcResult;
-import org.opendaylight.mdsal.dom.api.DOMRpcService;
-import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
+import org.opendaylight.restconf.common.errors.RestconfFuture;
import org.opendaylight.restconf.nb.rfc8040.MediaTypes;
+import org.opendaylight.restconf.nb.rfc8040.databind.DatabindContext;
import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
import org.opendaylight.restconf.nb.rfc8040.databind.JsonOperationInputBody;
import org.opendaylight.restconf.nb.rfc8040.databind.OperationInputBody;
import org.opendaylight.restconf.nb.rfc8040.databind.XmlOperationInputBody;
+import org.opendaylight.restconf.nb.rfc8040.legacy.InstanceIdentifierContext;
import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload;
import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
-import org.opendaylight.restconf.nb.rfc8040.utils.parser.ParserIdentifier;
import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotification;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscription;
import org.opendaylight.yangtools.yang.common.ErrorTag;
import org.opendaylight.yangtools.yang.common.ErrorType;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.opendaylight.yangtools.yang.common.YangConstants;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final Logger LOG = LoggerFactory.getLogger(RestconfInvokeOperationsServiceImpl.class);
private final DatabindProvider databindProvider;
- private final DOMRpcService rpcService;
+ private final MdsalRestconfServer server;
+ @Deprecated(forRemoval = true)
private final DOMMountPointService mountPointService;
private final SubscribeToStreamUtil streamUtils;
- public RestconfInvokeOperationsServiceImpl(final DatabindProvider databindProvider, final DOMRpcService rpcService,
- final DOMMountPointService mountPointService, final StreamsConfiguration configuration) {
+ public RestconfInvokeOperationsServiceImpl(final DatabindProvider databindProvider,
+ final MdsalRestconfServer server, final DOMMountPointService mountPointService,
+ final StreamsConfiguration configuration) {
this.databindProvider = requireNonNull(databindProvider);
- this.rpcService = requireNonNull(rpcService);
+ this.server = requireNonNull(server);
this.mountPointService = requireNonNull(mountPointService);
streamUtils = configuration.useSSE() ? SubscribeToStreamUtil.serverSentEvents()
: SubscribeToStreamUtil.webSockets();
private void invokeRpc(final String identifier, final UriInfo uriInfo, final AsyncResponse ar,
final OperationInputBody body) {
- final var dataBind = databindProvider.currentContext();
- final var schemaContext = dataBind.modelContext();
- final var context = ParserIdentifier.toInstanceIdentifier(identifier, schemaContext, mountPointService);
+ final var databind = databindProvider.currentContext();
+ final var reqPath = server.bindRequestPath(databind, identifier);
final ContainerNode input;
try {
- input = body.toContainerNode(context.inference());
+ input = body.toContainerNode(reqPath.inference());
} catch (IOException e) {
LOG.debug("Error reading input", e);
throw new RestconfDocumentedException("Error parsing input: " + e.getMessage(), ErrorType.PROTOCOL,
ErrorTag.MALFORMED_MESSAGE, e);
}
- final var rpcName = context.getSchemaNode().getQName();
- final ListenableFuture<? extends DOMRpcResult> future;
- final var mountPoint = context.getMountPoint();
- if (mountPoint == null) {
- if (CreateDataChangeEventSubscription.QNAME.equals(rpcName)) {
- future = Futures.immediateFuture(CreateStreamUtil.createDataChangeNotifiStream(
- streamUtils.listenersBroker(), input, schemaContext));
- } else if (SubscribeDeviceNotification.QNAME.equals(rpcName)) {
- final String baseUrl = streamUtils.prepareUriByStreamName(uriInfo, "").toString();
- future = Futures.immediateFuture(CreateStreamUtil.createDeviceNotificationListener(baseUrl, input,
- streamUtils, mountPointService));
- } else {
- future = invokeRpc(input, rpcName, rpcService);
- }
- } else {
- future = invokeRpc(input, rpcName, mountPoint);
- }
-
- Futures.addCallback(future, new FutureCallback<DOMRpcResult>() {
+ Futures.addCallback(hackInvokeRpc(databind, reqPath, uriInfo, input), new FutureCallback<>() {
@Override
- public void onSuccess(final DOMRpcResult response) {
- final var errors = response.errors();
- if (!errors.isEmpty()) {
- LOG.debug("RpcError message {}", response.errors());
- ar.resume(new RestconfDocumentedException("RPCerror message ", null, response.errors()));
- return;
- }
-
- final ContainerNode resultData = response.value();
- if (resultData == null || resultData.isEmpty()) {
- ar.resume(Response.noContent().build());
- } else {
- ar.resume(Response.ok().entity(new NormalizedNodePayload(context.inference(), resultData)).build());
+ public void onSuccess(final Optional<ContainerNode> result) {
+ if (result.isPresent()) {
+ final var output = result.orElseThrow();
+ if (!output.isEmpty()) {
+ ar.resume(Response.ok().entity(new NormalizedNodePayload(reqPath.inference(), output)).build());
+ return;
+ }
}
+ ar.resume(Response.noContent().build());
}
@Override
}, MoreExecutors.directExecutor());
}
- /**
- * Invoking rpc via mount point.
- *
- * @param mountPoint mount point
- * @param data input data
- * @param rpc RPC type
- * @return {@link DOMRpcResult}
- */
- @VisibleForTesting
- static ListenableFuture<? extends DOMRpcResult> invokeRpc(final ContainerNode data, final QName rpc,
- final DOMMountPoint mountPoint) {
- return invokeRpc(data, rpc, mountPoint.getService(DOMRpcService.class).orElseThrow(() -> {
- final String errmsg = "RPC service is missing.";
- LOG.debug(errmsg);
- return new RestconfDocumentedException(errmsg);
- }));
- }
-
- /**
- * Invoke rpc.
- *
- * @param input input data
- * @param rpc RPC type
- * @param rpcService rpc service to invoke rpc
- * @return {@link DOMRpcResult}
- */
- @VisibleForTesting
- static ListenableFuture<? extends DOMRpcResult> invokeRpc(final ContainerNode input, final QName rpc,
- final DOMRpcService rpcService) {
- return Futures.catching(rpcService.invokeRpc(rpc, nonnullInput(rpc, input)), DOMRpcException.class,
- cause -> new DefaultDOMRpcResult(List.of(RpcResultBuilder.newError(ErrorType.RPC, ErrorTag.OPERATION_FAILED,
- cause.getMessage()))),
- MoreExecutors.directExecutor());
- }
+ private RestconfFuture<Optional<ContainerNode>> hackInvokeRpc(final DatabindContext localDatabind,
+ final InstanceIdentifierContext reqPath, final UriInfo uriInfo, final ContainerNode input) {
+ // RPC type
+ final var type = reqPath.getSchemaNode().getQName();
+ final var mountPoint = reqPath.getMountPoint();
+ if (mountPoint == null) {
+ // Hacked-up integration of streams
+ if (CreateDataChangeEventSubscription.QNAME.equals(type)) {
+ return RestconfFuture.of(Optional.of(CreateStreamUtil.createDataChangeNotifiStream(
+ streamUtils.listenersBroker(), input, localDatabind.modelContext())));
+ } else if (SubscribeDeviceNotification.QNAME.equals(type)) {
+ final var baseUrl = streamUtils.prepareUriByStreamName(uriInfo, "").toString();
+ return RestconfFuture.of(Optional.of(CreateStreamUtil.createDeviceNotificationListener(baseUrl, input,
+ streamUtils, mountPointService)));
+ }
+ }
- private static @NonNull ContainerNode nonnullInput(final QName type, final ContainerNode input) {
- return input != null ? input
- : ImmutableNodes.containerNode(YangConstants.operationInputQName(type.getModule()));
+ return server.getRestconfStrategy(reqPath.getSchemaContext(), mountPoint).invokeRpc(type, input);
}
}