X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=restconf%2Fsal-rest-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Fsal%2Frestconf%2Fimpl%2FRestconfImpl.java;h=a7e2161702ed88ab184c6d1f3bd5b8a087cef264;hb=49a2bd9c0c10ea3356aba72284a5f2ab4661966e;hp=493dbb99c433f2f1d510f5221f12f25b518d1fae;hpb=6b2e2722172148449f7c50666543e065a919c790;p=netconf.git diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/restconf/impl/RestconfImpl.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/restconf/impl/RestconfImpl.java index 493dbb99c4..a7e2161702 100644 --- a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/restconf/impl/RestconfImpl.java +++ b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/restconf/impl/RestconfImpl.java @@ -11,28 +11,33 @@ package org.opendaylight.netconf.sal.restconf.impl; import com.google.common.base.CharMatcher; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.base.Predicates; import com.google.common.base.Splitter; import com.google.common.base.Strings; -import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; -import java.math.BigInteger; import java.net.URI; import java.net.URISyntaxException; +import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.CancellationException; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import javax.ws.rs.core.Context; import javax.ws.rs.core.Response; @@ -56,8 +61,11 @@ import org.opendaylight.netconf.sal.rest.api.RestconfService; import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorTag; import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorType; import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter; +import org.opendaylight.netconf.sal.streams.listeners.NotificationListenerAdapter; import org.opendaylight.netconf.sal.streams.listeners.Notificator; import org.opendaylight.netconf.sal.streams.websockets.WebSocketServer; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime; +import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.QNameModule; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -67,17 +75,20 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgum import org.opendaylight.yangtools.yang.data.api.schema.AugmentationNode; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; +import org.opendaylight.yangtools.yang.data.api.schema.LeafNode; import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode; +import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.tree.ModifiedNodeDoesNotExistException; import org.opendaylight.yangtools.yang.data.impl.schema.Builders; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeAttrBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.ListNodeBuilder; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.NormalizedNodeAttrBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder; import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode; import org.opendaylight.yangtools.yang.model.api.DataNodeContainer; import org.opendaylight.yangtools.yang.model.api.DataSchemaNode; @@ -86,10 +97,12 @@ import org.opendaylight.yangtools.yang.model.api.LeafListSchemaNode; import org.opendaylight.yangtools.yang.model.api.LeafSchemaNode; import org.opendaylight.yangtools.yang.model.api.ListSchemaNode; import org.opendaylight.yangtools.yang.model.api.Module; +import org.opendaylight.yangtools.yang.model.api.NotificationDefinition; import org.opendaylight.yangtools.yang.model.api.RpcDefinition; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.opendaylight.yangtools.yang.model.api.SchemaNode; import org.opendaylight.yangtools.yang.model.api.SchemaPath; +import org.opendaylight.yangtools.yang.parser.stmt.rfc6020.effective.EffectiveSchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -124,6 +137,8 @@ public class RestconfImpl implements RestconfService { private static final String SCOPE_PARAM_NAME = "scope"; + private static final String OUTPUT_TYPE_PARAM_NAME = "notification-output-type"; + private static final String NETCONF_BASE = "urn:ietf:params:xml:ns:netconf:base:1.0"; private static final String NETCONF_BASE_PAYLOAD_NAME = "data"; @@ -134,14 +149,23 @@ public class RestconfImpl implements RestconfService { private static final YangInstanceIdentifier.AugmentationIdentifier SAL_REMOTE_AUG_IDENTIFIER; + public static final CharSequence DATA_SUBSCR = "data-change-event-subscription"; + private static final CharSequence CREATE_DATA_SUBSCR = "create-" + DATA_SUBSCR; + + public static final CharSequence NOTIFICATION_STREAM = "notification-stream"; + private static final CharSequence CREATE_NOTIFICATION_STREAM = "create-" + NOTIFICATION_STREAM; + static { try { final Date eventSubscriptionAugRevision = new SimpleDateFormat("yyyy-MM-dd").parse("2014-07-08"); - NETCONF_BASE_QNAME = QName.create(QNameModule.create(new URI(NETCONF_BASE), null), NETCONF_BASE_PAYLOAD_NAME ); + NETCONF_BASE_QNAME = QName.create(QNameModule.create(new URI(NETCONF_BASE), null), + NETCONF_BASE_PAYLOAD_NAME); SAL_REMOTE_AUGMENT = QNameModule.create(NAMESPACE_EVENT_SUBSCRIPTION_AUGMENT, eventSubscriptionAugRevision); - SAL_REMOTE_AUG_IDENTIFIER = new YangInstanceIdentifier.AugmentationIdentifier(Sets.newHashSet(QName.create(SAL_REMOTE_AUGMENT, "scope"), - QName.create(SAL_REMOTE_AUGMENT, "datastore"))); + SAL_REMOTE_AUG_IDENTIFIER = new YangInstanceIdentifier.AugmentationIdentifier(Sets.newHashSet( + QName.create(SAL_REMOTE_AUGMENT, "scope"), + QName.create(SAL_REMOTE_AUGMENT, "datastore"), + QName.create(SAL_REMOTE_AUGMENT, "notification-output-type"))); } catch (final ParseException e) { final String errMsg = "It wasn't possible to convert revision date of sal-remote-augment to date"; LOG.debug(errMsg); @@ -201,7 +225,8 @@ public class RestconfImpl implements RestconfService { throw new RestconfDocumentedException(errMsg, ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE); } - final InstanceIdentifierContext mountPointIdentifier = this.controllerContext.toMountPointIdentifier(identifier); + final InstanceIdentifierContext mountPointIdentifier = this.controllerContext + .toMountPointIdentifier(identifier); final DOMMountPoint mountPoint = mountPointIdentifier.getMountPoint(); final Set modules = this.controllerContext.getAllModules(mountPoint); final MapNode mountPointModulesMap = makeModuleMapNode(modules); @@ -228,7 +253,8 @@ public class RestconfImpl implements RestconfService { DOMMountPoint mountPoint = null; final SchemaContext schemaContext; if (identifier.contains(ControllerContext.MOUNT)) { - final InstanceIdentifierContext mountPointIdentifier = this.controllerContext.toMountPointIdentifier(identifier); + final InstanceIdentifierContext mountPointIdentifier = this.controllerContext + .toMountPointIdentifier(identifier); mountPoint = mountPointIdentifier.getMountPoint(); module = this.controllerContext.findModuleByNameAndRevision(mountPoint, moduleNameAndRevision); schemaContext = mountPoint.getSchemaContext(); @@ -261,8 +287,8 @@ public class RestconfImpl implements RestconfService { final SchemaContext schemaContext = this.controllerContext.getGlobalSchema(); final Set availableStreams = Notificator.getStreamNames(); final Module restconfModule = getRestconfModule(); - final DataSchemaNode streamSchemaNode = this.controllerContext.getRestconfModuleRestConfSchemaNode(restconfModule, - Draft02.RestConfModule.STREAM_LIST_SCHEMA_NODE); + final DataSchemaNode streamSchemaNode = this.controllerContext + .getRestconfModuleRestConfSchemaNode(restconfModule, Draft02.RestConfModule.STREAM_LIST_SCHEMA_NODE); Preconditions.checkState(streamSchemaNode instanceof ListSchemaNode); final CollectionNodeBuilder listStreamsBuilder = Builders @@ -296,22 +322,64 @@ public class RestconfImpl implements RestconfService { Set modules = null; DOMMountPoint mountPoint = null; if (identifier.contains(ControllerContext.MOUNT)) { - final InstanceIdentifierContext mountPointIdentifier = this.controllerContext.toMountPointIdentifier(identifier); + final InstanceIdentifierContext mountPointIdentifier = this.controllerContext + .toMountPointIdentifier(identifier); mountPoint = mountPointIdentifier.getMountPoint(); modules = this.controllerContext.getAllModules(mountPoint); } else { - final String errMsg = "URI has bad format. If operations behind mount point should be showed, URI has to end with "; + final String errMsg = "URI has bad format. If operations behind mount point should be showed, URI has to " + + "end with "; LOG.debug(errMsg + ControllerContext.MOUNT + " for " + identifier); - throw new RestconfDocumentedException(errMsg + ControllerContext.MOUNT, ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE); + throw new RestconfDocumentedException(errMsg + ControllerContext.MOUNT, ErrorType.PROTOCOL, + ErrorTag.INVALID_VALUE); } return operationsFromModulesToNormalizedContext(modules, mountPoint); } + /** + * Special case only for GET restconf/operations use (since moment of pre-Beryllium + * Yang parser and Yang model API removal). The method is creating fake + * schema context with fake module and fake data by use own implementations + * of schema nodes and module. + * + * @param modules + * - set of modules for get RPCs from every module + * @param mountPoint + * - mount point, if in use otherwise null + * @return {@link NormalizedNodeContext} + */ private NormalizedNodeContext operationsFromModulesToNormalizedContext(final Set modules, final DOMMountPoint mountPoint) { - throw new UnsupportedOperationException(); + + final ContainerSchemaNodeImpl fakeCont = new ContainerSchemaNodeImpl(); + final List> listRpcNodes = new ArrayList<>(); + for (final Module m : modules) { + for (final RpcDefinition rpc : m.getRpcs()) { + + final LeafSchemaNode fakeLeaf = new LeafSchemaNodeImpl(fakeCont.getPath(), + QName.create(ModuleImpl.moduleQName, m.getName() + ":" + rpc.getQName().getLocalName())); + fakeCont.addNodeChild(fakeLeaf); + listRpcNodes.add(Builders.leafBuilder(fakeLeaf).build()); + } + } + final ContainerSchemaNode fakeContSchNode = fakeCont; + final DataContainerNodeAttrBuilder containerBuilder = Builders + .containerBuilder(fakeContSchNode); + + for (final LeafNode rpcNode : listRpcNodes) { + containerBuilder.withChild(rpcNode); + } + + final Module fakeModule = new ModuleImpl(fakeContSchNode); + + final Set fakeModules = new HashSet<>(); + fakeModules.add(fakeModule); + final SchemaContext fakeSchemaCtx = EffectiveSchemaContext.resolveSchemaContext(fakeModules); + final InstanceIdentifierContext instanceIdentifierContext = + new InstanceIdentifierContext<>(null, fakeContSchNode, mountPoint, fakeSchemaCtx); + return new NormalizedNodeContext(instanceIdentifierContext, containerBuilder.build()); } private Module getRestconfModule() { @@ -362,7 +430,8 @@ public class RestconfImpl implements RestconfService { } @Override - public NormalizedNodeContext invokeRpc(final String identifier, final NormalizedNodeContext payload, final UriInfo uriInfo) { + public NormalizedNodeContext invokeRpc(final String identifier, final NormalizedNodeContext payload, + final UriInfo uriInfo) { final SchemaPath type = payload.getInstanceIdentifierContext().getSchemaNode().getPath(); final URI namespace = payload.getInstanceIdentifierContext().getSchemaNode().getQName().getNamespace(); final CheckedFuture response; @@ -378,7 +447,15 @@ public class RestconfImpl implements RestconfService { response = mountRpcServices.get().invokeRpc(type, payload.getData()); } else { if (namespace.toString().equals(SAL_REMOTE_NAMESPACE)) { - response = invokeSalRemoteRpcSubscribeRPC(payload); + if (identifier.contains(CREATE_DATA_SUBSCR)) { + response = invokeSalRemoteRpcSubscribeRPC(payload); + } else if (identifier.contains(CREATE_NOTIFICATION_STREAM)) { + response = invokeSalRemoteRpcNotifiStrRPC(payload); + } else { + final String msg = "Not supported operation"; + LOG.warn(msg); + throw new RestconfDocumentedException(msg, ErrorType.RPC, ErrorTag.OPERATION_NOT_SUPPORTED); + } } else { response = this.broker.invokeRpc(type, payload.getData()); } @@ -425,11 +502,14 @@ public class RestconfImpl implements RestconfService { throw new RestconfDocumentedException(cause.getMessage(), ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE); } else if (cause instanceof DOMRpcImplementationNotAvailableException) { - throw new RestconfDocumentedException(cause.getMessage(), ErrorType.APPLICATION, ErrorTag.OPERATION_NOT_SUPPORTED); + throw new RestconfDocumentedException(cause.getMessage(), ErrorType.APPLICATION, + ErrorTag.OPERATION_NOT_SUPPORTED); } - throw new RestconfDocumentedException("The operation encountered an unexpected error while executing.",cause); + throw new RestconfDocumentedException("The operation encountered an unexpected error while executing.", + cause); } else { - throw new RestconfDocumentedException("The operation encountered an unexpected error while executing.",e); + throw new RestconfDocumentedException("The operation encountered an unexpected error while executing.", + e); } } catch (final CancellationException e) { final String errMsg = "The operation was cancelled while executing."; @@ -446,15 +526,10 @@ public class RestconfImpl implements RestconfService { // did not expect any input throw new RestconfDocumentedException("No input expected.", ErrorType.PROTOCOL, ErrorTag.MALFORMED_MESSAGE); } - // else - // { - // TODO: Validate "mandatory" and "config" values here??? Or should those be - // those be - // validate in a more central location inside MD-SAL core. - // } } - private CheckedFuture invokeSalRemoteRpcSubscribeRPC(final NormalizedNodeContext payload) { + private CheckedFuture + invokeSalRemoteRpcSubscribeRPC(final NormalizedNodeContext payload) { final ContainerNode value = (ContainerNode) payload.getData(); final QName rpcQName = payload.getInstanceIdentifierContext().getSchemaNode().getQName(); final Optional> path = value.getChild(new NodeIdentifier( @@ -468,16 +543,23 @@ public class RestconfImpl implements RestconfService { } final YangInstanceIdentifier pathIdentifier = ((YangInstanceIdentifier) pathValue); - String streamName = null; + String streamName = (String) CREATE_DATA_SUBSCR; + NotificationOutputType outputType = null; if (!pathIdentifier.isEmpty()) { - final String fullRestconfIdentifier = this.controllerContext.toFullRestconfIdentifier(pathIdentifier, null); + final String fullRestconfIdentifier = DATA_SUBSCR + + this.controllerContext.toFullRestconfIdentifier(pathIdentifier, null); - LogicalDatastoreType datastore = parseEnumTypeParameter(value, LogicalDatastoreType.class, DATASTORE_PARAM_NAME); + LogicalDatastoreType datastore = + parseEnumTypeParameter(value, LogicalDatastoreType.class, DATASTORE_PARAM_NAME); datastore = datastore == null ? DEFAULT_DATASTORE : datastore; DataChangeScope scope = parseEnumTypeParameter(value, DataChangeScope.class, SCOPE_PARAM_NAME); scope = scope == null ? DEFAULT_SCOPE : scope; + outputType = parseEnumTypeParameter(value, NotificationOutputType.class, + OUTPUT_TYPE_PARAM_NAME); + outputType = outputType == null ? NotificationOutputType.XML : outputType; + streamName = Notificator.createStreamNameFromUri(fullRestconfIdentifier + "/datastore=" + datastore + "/scope=" + scope); } @@ -491,11 +573,12 @@ public class RestconfImpl implements RestconfService { final QName outputQname = QName.create(rpcQName, "output"); final QName streamNameQname = QName.create(rpcQName, "stream-name"); - final ContainerNode output = ImmutableContainerNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(outputQname)) - .withChild(ImmutableNodes.leafNode(streamNameQname, streamName)).build(); + final ContainerNode output = + ImmutableContainerNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(outputQname)) + .withChild(ImmutableNodes.leafNode(streamNameQname, streamName)).build(); if (!Notificator.existListenerFor(streamName)) { - Notificator.createListener(pathIdentifier, streamName); + Notificator.createListener(pathIdentifier, streamName, outputType); } final DOMRpcResult defaultDOMRpcResult = new DefaultDOMRpcResult(output); @@ -651,36 +734,67 @@ public class RestconfImpl implements RestconfService { * failures back to the client and forcing them to handle it via retry (and having to * document the behavior). */ - int tries = 2; + PutResult result = null; + final TryOfPutData tryPutData = new TryOfPutData(); while(true) { - try { - if (mountPoint != null) { - this.broker.commitConfigurationDataPut(mountPoint, normalizedII, payload.getData()).checkedGet(); - } else { - this.broker.commitConfigurationDataPut(this.controllerContext.getGlobalSchema(), normalizedII, payload.getData()).checkedGet(); - } + if (mountPoint != null) { - break; - } catch (final TransactionCommitFailedException e) { - if(e instanceof OptimisticLockFailedException) { - if(--tries <= 0) { - LOG.debug("Got OptimisticLockFailedException on last try - failing " + identifier); - throw new RestconfDocumentedException(e.getMessage(), e, e.getErrorList()); - } + result = this.broker.commitMountPointDataPut(mountPoint, normalizedII, payload.getData()); + } else { + result = this.broker.commitConfigurationDataPut(this.controllerContext.getGlobalSchema(), normalizedII, + payload.getData()); + } + final CountDownLatch waiter = new CountDownLatch(1); + Futures.addCallback(result.getFutureOfPutData(), new FutureCallback() { - LOG.debug("Got OptimisticLockFailedException - trying again " + identifier); - } else { - LOG.debug("Update ConfigDataStore fail " + identifier, e); - throw new RestconfDocumentedException(e.getMessage(), e, e.getErrorList()); + @Override + public void onSuccess(final Void result) { + handlingLoggerPut(null, tryPutData, identifier); + waiter.countDown(); } + + @Override + public void onFailure(final Throwable t) { + waiter.countDown(); + handlingLoggerPut(t, tryPutData, identifier); + } + }); + + try { + waiter.await(); } catch (final Exception e) { - final String errMsg = "Error updating data "; - LOG.debug(errMsg + identifier, e); - throw new RestconfDocumentedException(errMsg, e); + final String msg = "Problem while waiting for response"; + LOG.warn(msg); + throw new RestconfDocumentedException(msg, e); + } + + if(tryPutData.isDone()){ + break; + } else { + throw new RestconfDocumentedException("Problem while PUT operations"); } } - return Response.status(Status.OK).build(); + return Response.status(result.getStatus()).build(); + } + + protected void handlingLoggerPut(final Throwable t, final TryOfPutData tryPutData, final String identifier) { + if (t != null) { + if (t instanceof OptimisticLockFailedException) { + if (tryPutData.countGet() <= 0) { + LOG.debug("Got OptimisticLockFailedException on last try - failing " + identifier); + throw new RestconfDocumentedException(t.getMessage(), t); + } + LOG.debug("Got OptimisticLockFailedException - trying again " + identifier); + tryPutData.countDown(); + } else { + LOG.debug("Update ConfigDataStore fail " + identifier, t); + throw new RestconfDocumentedException(t.getMessage(), t); + } + } else { + LOG.trace("PUT Successful " + identifier); + tryPutData.done(); + } } private static void validateTopLevelNodeName(final NormalizedNodeContext node, @@ -742,14 +856,16 @@ public class RestconfImpl implements RestconfService { if ( ! uriKeyValue.equals(dataKeyValue)) { final String errMsg = "The value '" + uriKeyValue + "' for key '" + keyDefinition.getLocalName() + - "' specified in the URI doesn't match the value '" + dataKeyValue + "' specified in the message body. "; + "' specified in the URI doesn't match the value '" + dataKeyValue + + "' specified in the message body. "; throw new RestconfDocumentedException(errMsg, ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE); } } } @Override - public Response createConfigurationData(final String identifier, final NormalizedNodeContext payload, final UriInfo uriInfo) { + public Response createConfigurationData(final String identifier, final NormalizedNodeContext payload, + final UriInfo uriInfo) { return createConfigurationData(payload, uriInfo); } @@ -801,18 +917,37 @@ public class RestconfImpl implements RestconfService { final DOMMountPoint mountPoint = payload.getInstanceIdentifierContext().getMountPoint(); final InstanceIdentifierContext iiWithData = payload.getInstanceIdentifierContext(); final YangInstanceIdentifier normalizedII = iiWithData.getInstanceIdentifier(); - try { - if (mountPoint != null) { - this.broker.commitConfigurationDataPost(mountPoint, normalizedII, payload.getData()).checkedGet(); - } else { - this.broker.commitConfigurationDataPost(this.controllerContext.getGlobalSchema(), normalizedII, payload.getData()).checkedGet(); + + CheckedFuture future; + if (mountPoint != null) { + future = this.broker.commitConfigurationDataPost(mountPoint, normalizedII, payload.getData()); + } else { + future = this.broker.commitConfigurationDataPost(this.controllerContext.getGlobalSchema(), normalizedII, + payload.getData()); + } + + final CountDownLatch waiter = new CountDownLatch(1); + Futures.addCallback(future, new FutureCallback() { + + @Override + public void onSuccess(final Void result) { + handlerLoggerPost(null, uriInfo); + waiter.countDown(); + } + + @Override + public void onFailure(final Throwable t) { + waiter.countDown(); + handlerLoggerPost(t, uriInfo); } - } catch(final RestconfDocumentedException e) { - throw e; + }); + + try { + waiter.await(); } catch (final Exception e) { - final String errMsg = "Error creating data "; - LOG.info(errMsg + (uriInfo != null ? uriInfo.getPath() : ""), e); - throw new RestconfDocumentedException(errMsg, e); + final String msg = "Problem while waiting for response"; + LOG.warn(msg); + throw new RestconfDocumentedException(msg, e); } final ResponseBuilder responseBuilder = Response.status(Status.NO_CONTENT); @@ -824,7 +959,18 @@ public class RestconfImpl implements RestconfService { return responseBuilder.build(); } - private URI resolveLocation(final UriInfo uriInfo, final String uriBehindBase, final DOMMountPoint mountPoint, final YangInstanceIdentifier normalizedII) { + protected void handlerLoggerPost(final Throwable t, final UriInfo uriInfo) { + if (t != null) { + final String errMsg = "Error creating data "; + LOG.warn(errMsg + (uriInfo != null ? uriInfo.getPath() : ""), t); + throw new RestconfDocumentedException(errMsg, t); + } else { + LOG.trace("Successfuly create data."); + } + } + + private URI resolveLocation(final UriInfo uriInfo, final String uriBehindBase, final DOMMountPoint mountPoint, + final YangInstanceIdentifier normalizedII) { if(uriInfo == null) { // This is null if invoked internally return null; @@ -847,25 +993,51 @@ public class RestconfImpl implements RestconfService { final DOMMountPoint mountPoint = iiWithData.getMountPoint(); final YangInstanceIdentifier normalizedII = iiWithData.getInstanceIdentifier(); - try { - if (mountPoint != null) { - this.broker.commitConfigurationDataDelete(mountPoint, normalizedII); - } else { - this.broker.commitConfigurationDataDelete(normalizedII).get(); + final CheckedFuture future; + if (mountPoint != null) { + future = this.broker.commitConfigurationDataDelete(mountPoint, normalizedII); + } else { + future = this.broker.commitConfigurationDataDelete(normalizedII); + } + + final CountDownLatch waiter = new CountDownLatch(1); + Futures.addCallback(future, new FutureCallback() { + + @Override + public void onSuccess(final Void result) { + handlerLoggerDelete(null); + waiter.countDown(); } - } catch (final Exception e) { - final Optional searchedException = Iterables.tryFind(Throwables.getCausalChain(e), - Predicates.instanceOf(ModifiedNodeDoesNotExistException.class)); - if (searchedException.isPresent()) { - throw new RestconfDocumentedException("Data specified for deleting doesn't exist.", ErrorType.APPLICATION, ErrorTag.DATA_MISSING); + + @Override + public void onFailure(final Throwable t) { + waiter.countDown(); + handlerLoggerDelete(t); } - final String errMsg = "Error while deleting data"; - LOG.info(errMsg, e); - throw new RestconfDocumentedException(errMsg, e); + + }); + + try { + waiter.await(); + } catch (final Exception e) { + final String msg = "Problem while waiting for response"; + LOG.warn(msg); + throw new RestconfDocumentedException(msg, e); } + return Response.status(Status.OK).build(); } + protected void handlerLoggerDelete(final Throwable t) { + if (t != null) { + final String errMsg = "Error while deleting data"; + LOG.info(errMsg, t); + throw new RestconfDocumentedException(errMsg, t); + } else { + LOG.trace("Successfuly delete data."); + } + } + /** * Subscribes to some path in schema context (stream) to listen on changes on this stream. * @@ -876,7 +1048,165 @@ public class RestconfImpl implements RestconfService { * */ @Override - public Response subscribeToStream(final String identifier, final UriInfo uriInfo) { + public NormalizedNodeContext subscribeToStream(final String identifier, final UriInfo uriInfo) { + boolean startTime_used = false; + boolean stopTime_used = false; + Date start = null; + Date stop = null; + + for (final Entry> entry : uriInfo.getQueryParameters().entrySet()) { + switch (entry.getKey()) { + case "start-time": + if (!startTime_used) { + startTime_used = true; + start = parseDateFromQueryParam(entry); + } else { + throw new RestconfDocumentedException("Start-time parameter can be used only once."); + } + break; + case "stop-time": + if (!stopTime_used) { + stopTime_used = true; + stop = parseDateFromQueryParam(entry); + } else { + throw new RestconfDocumentedException("Stop-time parameter can be used only once."); + } + break; + default: + throw new RestconfDocumentedException("Bad parameter used with notifications: " + entry.getKey()); + } + } + if(!startTime_used && stopTime_used){ + throw new RestconfDocumentedException("Stop-time parameter has to be used with start-time parameter."); + } + URI response = null; + if (identifier.contains(DATA_SUBSCR)) { + response = dataSubs(identifier, uriInfo, start, stop); + } else if (identifier.contains(NOTIFICATION_STREAM)) { + response = notifStream(identifier, uriInfo, start, stop); + } + + if(response != null){ + // prepare node with value of location + final InstanceIdentifierContext iid = prepareIIDSubsStreamOutput(); + final NormalizedNodeAttrBuilder> builder = ImmutableLeafNodeBuilder + .create().withValue(response.toString()); + builder.withNodeIdentifier( + NodeIdentifier.create(QName.create("subscribe:to:notification", "2016-10-28", "location"))); + + // prepare new header with location + final Map headers = new HashMap<>(); + headers.put("Location", response); + + return new NormalizedNodeContext(iid, builder.build(), headers); + } + + final String msg = "Bad type of notification of sal-remote"; + LOG.warn(msg); + throw new RestconfDocumentedException(msg); + } + + private Date parseDateFromQueryParam(final Entry> entry) { + final DateAndTime event = new DateAndTime(entry.getValue().iterator().next()); + String numOf_ms = ""; + final String value = event.getValue(); + if (value.contains(".")) { + numOf_ms = numOf_ms + "."; + final int lastChar = value.contains("Z") ? value.indexOf("Z") : (value.contains("+") ? value.indexOf("+") + : (value.contains("-") ? value.indexOf("-") : value.length())); + for (int i = 0; i < (lastChar - value.indexOf(".") - 1); i++) { + numOf_ms = numOf_ms + "S"; + } + } + String zone = ""; + if (!value.contains("Z")) { + zone = zone + "XXX"; + } + final DateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss" + numOf_ms + zone); + + try { + return dateFormatter.parse(value.contains("Z") ? value.replace('T', ' ').substring(0, value.indexOf("Z")) + : value.replace('T', ' ')); + } catch (final ParseException e) { + throw new RestconfDocumentedException("Cannot parse of value in date: " + value + e); + } + } + + /** + * @return {@link InstanceIdentifierContext} of location leaf for + * notification + */ + private InstanceIdentifierContext prepareIIDSubsStreamOutput() { + final QName qnameBase = QName.create("subscribe:to:notification", "2016-10-28", "notifi"); + final SchemaContext schemaCtx = ControllerContext.getInstance().getGlobalSchema(); + final DataSchemaNode location = ((ContainerSchemaNode) schemaCtx + .findModuleByNamespaceAndRevision(qnameBase.getNamespace(), qnameBase.getRevision()) + .getDataChildByName(qnameBase)).getDataChildByName(QName.create(qnameBase, "location")); + final List path = new ArrayList<>(); + path.add(NodeIdentifier.create(qnameBase)); + path.add(NodeIdentifier.create(QName.create(qnameBase, "location"))); + + return new InstanceIdentifierContext(YangInstanceIdentifier.create(path), location, null, + schemaCtx); + } + + /** + * Register notification listener by stream name + * + * @param identifier + * - stream name + * @param uriInfo + * - uriInfo + * @param stop + * - stop-time of getting notification + * @param start + * - start-time of getting notification + * @return {@link Response} + */ + private URI notifStream(final String identifier, final UriInfo uriInfo, final Date start, final Date stop) { + final String streamName = Notificator.createStreamNameFromUri(identifier); + if (Strings.isNullOrEmpty(streamName)) { + throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE); + } + final List listeners = Notificator.getNotificationListenerFor(streamName); + if ((listeners == null) || listeners.isEmpty()) { + throw new RestconfDocumentedException("Stream was not found.", ErrorType.PROTOCOL, + ErrorTag.UNKNOWN_ELEMENT); + } + + for (final NotificationListenerAdapter listener : listeners) { + this.broker.registerToListenNotification(listener); + listener.setTime(start, stop); + } + + final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder(); + int notificationPort = NOTIFICATION_PORT; + try { + final WebSocketServer webSocketServerInstance = WebSocketServer.getInstance(); + notificationPort = webSocketServerInstance.getPort(); + } catch (final NullPointerException e) { + WebSocketServer.createInstance(NOTIFICATION_PORT); + } + final UriBuilder uriToWebsocketServerBuilder = uriBuilder.port(notificationPort).scheme("ws"); + final URI uriToWebsocketServer = uriToWebsocketServerBuilder.replacePath(streamName).build(); + + return uriToWebsocketServer; + } + + /** + * Register data change listener by stream name + * + * @param identifier + * - stream name + * @param uriInfo + * - uri info + * @param stop + * - start-time of getting notification + * @param start + * - stop-time of getting notification + * @return {@link Response} + */ + private URI dataSubs(final String identifier, final UriInfo uriInfo, final Date start, final Date stop) { final String streamName = Notificator.createStreamNameFromUri(identifier); if (Strings.isNullOrEmpty(streamName)) { throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE); @@ -886,6 +1216,7 @@ public class RestconfImpl implements RestconfService { if (listener == null) { throw new RestconfDocumentedException("Stream was not found.", ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT); } + listener.setTimer(start, stop); final Map paramToValues = resolveValuesFromUri(identifier); final LogicalDatastoreType datastore = parserURIEnumParameter(LogicalDatastoreType.class, @@ -913,15 +1244,22 @@ public class RestconfImpl implements RestconfService { final UriBuilder uriToWebsocketServerBuilder = uriBuilder.port(notificationPort).scheme("ws"); final URI uriToWebsocketServer = uriToWebsocketServerBuilder.replacePath(streamName).build(); - return Response.status(Status.OK).location(uriToWebsocketServer).build(); + return uriToWebsocketServer; } @Override - public PATCHStatusContext patchConfigurationData(final String identifier, final PATCHContext context, final UriInfo uriInfo) { + public PATCHStatusContext patchConfigurationData(final String identifier, final PATCHContext context, + final UriInfo uriInfo) { if (context == null) { throw new RestconfDocumentedException("Input is required.", ErrorType.PROTOCOL, ErrorTag.MALFORMED_MESSAGE); } - return this.broker.patchConfigurationDataWithinTransaction(context, this.controllerContext.getGlobalSchema()); + + try { + return this.broker.patchConfigurationDataWithinTransaction(context); + } catch (final Exception e) { + LOG.debug("Patch transaction failed", e); + throw new RestconfDocumentedException(e.getMessage()); + } } @Override @@ -929,13 +1267,19 @@ public class RestconfImpl implements RestconfService { if (context == null) { throw new RestconfDocumentedException("Input is required.", ErrorType.PROTOCOL, ErrorTag.MALFORMED_MESSAGE); } - return this.broker.patchConfigurationDataWithinTransaction(context, this.controllerContext.getGlobalSchema()); + + try { + return this.broker.patchConfigurationDataWithinTransaction(context); + } catch (final Exception e) { + LOG.debug("Patch transaction failed", e); + throw new RestconfDocumentedException(e.getMessage()); + } } /** * Load parameter for subscribing to stream from input composite node * - * @param compNode + * @param value * contains value * @return enum object if its string value is equal to {@code paramName}. In other cases null. */ @@ -946,7 +1290,8 @@ public class RestconfImpl implements RestconfService { return null; } final Optional> enumNode = - ((AugmentationNode) augNode.get()).getChild(new NodeIdentifier(QName.create(SAL_REMOTE_AUGMENT, paramName))); + ((AugmentationNode) augNode.get()) + .getChild(new NodeIdentifier(QName.create(SAL_REMOTE_AUGMENT, paramName))); if (!enumNode.isPresent()) { return null; } @@ -995,11 +1340,6 @@ public class RestconfImpl implements RestconfService { return result; } - public BigInteger getOperationalReceived() { - // TODO Auto-generated method stub - return null; - } - private MapNode makeModuleMapNode(final Set modules) { Preconditions.checkNotNull(modules); final Module restconfModule = getRestconfModule(); @@ -1104,4 +1444,89 @@ public class RestconfImpl implements RestconfService { return streamNodeValues.build(); } + + /** + * Prepare stream for notification + * + * @param payload + * - contains list of qnames of notifications + * @return - checked future object + */ + private CheckedFuture invokeSalRemoteRpcNotifiStrRPC( + final NormalizedNodeContext payload) { + final ContainerNode data = (ContainerNode) payload.getData(); + LeafSetNode leafSet = null; + String outputType = "XML"; + for (final DataContainerChild dataChild : data.getValue()) { + if (dataChild instanceof LeafSetNode) { + leafSet = (LeafSetNode) dataChild; + } else if (dataChild instanceof AugmentationNode) { + outputType = (String) (((AugmentationNode) dataChild).getValue()).iterator().next().getValue(); + } + } + + final Collection entryNodes = leafSet.getValue(); + final List paths = new ArrayList<>(); + String streamName = CREATE_NOTIFICATION_STREAM + "/"; + + final Iterator iterator = entryNodes.iterator(); + while (iterator.hasNext()) { + final QName valueQName = QName.create((String) iterator.next().getValue()); + final Module module = ControllerContext.getInstance() + .findModuleByNamespace(valueQName.getModule().getNamespace()); + Preconditions.checkNotNull(module, "Module for namespace " + valueQName.getModule().getNamespace() + + " does not exist"); + NotificationDefinition notifiDef = null; + for (final NotificationDefinition notification : module.getNotifications()) { + if (notification.getQName().equals(valueQName)) { + notifiDef = notification; + break; + } + } + final String moduleName = module.getName(); + Preconditions.checkNotNull(notifiDef, + "Notification " + valueQName + "doesn't exist in module " + moduleName); + paths.add(notifiDef.getPath()); + streamName = streamName + moduleName + ":" + valueQName.getLocalName(); + if (iterator.hasNext()) { + streamName = streamName + ","; + } + } + + final QName rpcQName = payload.getInstanceIdentifierContext().getSchemaNode().getQName(); + final QName outputQname = QName.create(rpcQName, "output"); + final QName streamNameQname = QName.create(rpcQName, "notification-stream-identifier"); + + final ContainerNode output = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(new NodeIdentifier(outputQname)) + .withChild(ImmutableNodes.leafNode(streamNameQname, streamName)).build(); + + if (!Notificator.existNotificationListenerFor(streamName)) { + Notificator.createNotificationListener(paths, streamName, outputType); + } + + final DOMRpcResult defaultDOMRpcResult = new DefaultDOMRpcResult(output); + + return Futures.immediateCheckedFuture(defaultDOMRpcResult); + } + + private class TryOfPutData { + int tries = 2; + boolean done = false; + + void countDown() { + this.tries--; + } + + void done() { + this.done = true; + } + + boolean isDone() { + return this.done; + } + int countGet() { + return this.tries; + } + } }