import com.google.common.base.CharMatcher;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.base.Predicates;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
-import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import java.math.BigInteger;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorTag;
import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorType;
import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter;
+import org.opendaylight.netconf.sal.streams.listeners.NotificationListenerAdapter;
import org.opendaylight.netconf.sal.streams.listeners.Notificator;
import org.opendaylight.netconf.sal.streams.websockets.WebSocketServer;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.schema.AugmentationNode;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.ModifiedNodeDoesNotExistException;
import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
import org.opendaylight.yangtools.yang.model.api.LeafSchemaNode;
import org.opendaylight.yangtools.yang.model.api.ListSchemaNode;
import org.opendaylight.yangtools.yang.model.api.Module;
+import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaNode;
import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.opendaylight.yangtools.yang.parser.stmt.rfc6020.effective.EffectiveSchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final YangInstanceIdentifier.AugmentationIdentifier SAL_REMOTE_AUG_IDENTIFIER;
+ public static final CharSequence DATA_SUBSCR = "data-change-event-subscription";
+ private static final CharSequence CREATE_DATA_SUBSCR = "create-" + DATA_SUBSCR;
+
+ public static final CharSequence NOTIFICATION_STREAM = "notification-stream";
+ private static final CharSequence CREATE_NOTIFICATION_STREAM = "create-" + NOTIFICATION_STREAM;
+
static {
try {
final Date eventSubscriptionAugRevision = new SimpleDateFormat("yyyy-MM-dd").parse("2014-07-08");
return operationsFromModulesToNormalizedContext(modules, mountPoint);
}
+ /**
+ * Special case only for GET restconf/operations use (since moment of pre-Beryllium
+ * Yang parser and Yang model API removal). The method is creating fake
+ * schema context with fake module and fake data by use own implementations
+ * of schema nodes and module.
+ *
+ * @param modules
+ * - set of modules for get RPCs from every module
+ * @param mountPoint
+ * - mount point, if in use otherwise null
+ * @return {@link NormalizedNodeContext}
+ */
private NormalizedNodeContext operationsFromModulesToNormalizedContext(final Set<Module> modules,
final DOMMountPoint mountPoint) {
- throw new UnsupportedOperationException();
+
+ final ContainerSchemaNodeImpl fakeCont = new ContainerSchemaNodeImpl();
+ final List<LeafNode<Object>> listRpcNodes = new ArrayList<>();
+ for (final Module m : modules) {
+ for (final RpcDefinition rpc : m.getRpcs()) {
+
+ final LeafSchemaNode fakeLeaf = new LeafSchemaNodeImpl(fakeCont.getPath(),
+ QName.create(ModuleImpl.moduleQName, m.getName() + ":" + rpc.getQName().getLocalName()));
+ fakeCont.addNodeChild(fakeLeaf);
+ listRpcNodes.add(Builders.leafBuilder(fakeLeaf).build());
+ }
+ }
+ final ContainerSchemaNode fakeContSchNode = fakeCont;
+ final DataContainerNodeAttrBuilder<NodeIdentifier, ContainerNode> containerBuilder = Builders
+ .containerBuilder(fakeContSchNode);
+
+ for (final LeafNode<Object> rpcNode : listRpcNodes) {
+ containerBuilder.withChild(rpcNode);
+ }
+
+ final Module fakeModule = new ModuleImpl(fakeContSchNode);
+
+ final Set<Module> fakeModules = new HashSet<>();
+ fakeModules.add(fakeModule);
+ final SchemaContext fakeSchemaCtx = EffectiveSchemaContext.resolveSchemaContext(fakeModules);
+ final InstanceIdentifierContext<ContainerSchemaNode> instanceIdentifierContext = new InstanceIdentifierContext<>(
+ null, fakeContSchNode, mountPoint, fakeSchemaCtx);
+ return new NormalizedNodeContext(instanceIdentifierContext, containerBuilder.build());
}
private Module getRestconfModule() {
response = mountRpcServices.get().invokeRpc(type, payload.getData());
} else {
if (namespace.toString().equals(SAL_REMOTE_NAMESPACE)) {
- response = invokeSalRemoteRpcSubscribeRPC(payload);
+ if (identifier.contains(CREATE_DATA_SUBSCR)) {
+ response = invokeSalRemoteRpcSubscribeRPC(payload);
+ } else if (identifier.contains(CREATE_NOTIFICATION_STREAM)) {
+ response = invokeSalRemoteRpcNotifiStrRPC(payload);
+ } else {
+ final String msg = "Not supported operation";
+ LOG.warn(msg);
+ throw new RestconfDocumentedException(msg, ErrorType.RPC, ErrorTag.OPERATION_NOT_SUPPORTED);
+ }
} else {
response = this.broker.invokeRpc(type, payload.getData());
}
// did not expect any input
throw new RestconfDocumentedException("No input expected.", ErrorType.PROTOCOL, ErrorTag.MALFORMED_MESSAGE);
}
- // else
- // {
- // TODO: Validate "mandatory" and "config" values here??? Or should those be
- // those be
- // validate in a more central location inside MD-SAL core.
- // }
}
private CheckedFuture<DOMRpcResult, DOMRpcException> invokeSalRemoteRpcSubscribeRPC(final NormalizedNodeContext payload) {
}
final YangInstanceIdentifier pathIdentifier = ((YangInstanceIdentifier) pathValue);
- String streamName = null;
+ String streamName = (String) CREATE_DATA_SUBSCR;
if (!pathIdentifier.isEmpty()) {
- final String fullRestconfIdentifier = this.controllerContext.toFullRestconfIdentifier(pathIdentifier, null);
+ final String fullRestconfIdentifier = DATA_SUBSCR
+ + this.controllerContext.toFullRestconfIdentifier(pathIdentifier, null);
LogicalDatastoreType datastore = parseEnumTypeParameter(value, LogicalDatastoreType.class, DATASTORE_PARAM_NAME);
datastore = datastore == null ? DEFAULT_DATASTORE : datastore;
* failures back to the client and forcing them to handle it via retry (and having to
* document the behavior).
*/
- int tries = 2;
+ PutResult result = null;
+ final TryOfPutData tryPutData = new TryOfPutData();
while(true) {
- try {
- if (mountPoint != null) {
- this.broker.commitConfigurationDataPut(mountPoint, normalizedII, payload.getData()).checkedGet();
- } else {
- this.broker.commitConfigurationDataPut(this.controllerContext.getGlobalSchema(), normalizedII, payload.getData()).checkedGet();
+ if (mountPoint != null) {
+
+ result = this.broker.commitMountPointDataPut(mountPoint, normalizedII, payload.getData());
+ } else {
+ result = this.broker.commitConfigurationDataPut(this.controllerContext.getGlobalSchema(), normalizedII,
+ payload.getData());
+ }
+ final CountDownLatch waiter = new CountDownLatch(1);
+ Futures.addCallback(result.getFutureOfPutData(), new FutureCallback<Void>() {
+
+ @Override
+ public void onSuccess(final Void result) {
+ handlingLoggerPut(null, tryPutData, identifier);
+ waiter.countDown();
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ waiter.countDown();
+ handlingLoggerPut(t, tryPutData, identifier);
}
+ });
+ try {
+ waiter.await();
+ } catch (final InterruptedException e) {
+ final String msg = "Problem while waiting for response";
+ LOG.warn(msg);
+ throw new RestconfDocumentedException(msg, e);
+ }
+
+ if(tryPutData.isDone()){
break;
- } catch (final TransactionCommitFailedException e) {
- if(e instanceof OptimisticLockFailedException) {
- if(--tries <= 0) {
- LOG.debug("Got OptimisticLockFailedException on last try - failing " + identifier);
- throw new RestconfDocumentedException(e.getMessage(), e, e.getErrorList());
- }
+ } else {
+ throw new RestconfDocumentedException("Problem while PUT operations");
+ }
+ }
- LOG.debug("Got OptimisticLockFailedException - trying again " + identifier);
- } else {
- LOG.debug("Update ConfigDataStore fail " + identifier, e);
- throw new RestconfDocumentedException(e.getMessage(), e, e.getErrorList());
+ return Response.status(result.getStatus()).build();
+ }
+
+ protected void handlingLoggerPut(final Throwable t, final TryOfPutData tryPutData, final String identifier) {
+ if (t != null) {
+ if (t instanceof OptimisticLockFailedException) {
+ if (tryPutData.countGet() <= 0) {
+ LOG.debug("Got OptimisticLockFailedException on last try - failing " + identifier);
+ throw new RestconfDocumentedException(t.getMessage(), t);
}
- } catch (final Exception e) {
- final String errMsg = "Error updating data ";
- LOG.debug(errMsg + identifier, e);
- throw new RestconfDocumentedException(errMsg, e);
+ LOG.debug("Got OptimisticLockFailedException - trying again " + identifier);
+ tryPutData.countDown();
+ } else {
+ LOG.debug("Update ConfigDataStore fail " + identifier, t);
+ throw new RestconfDocumentedException(t.getMessage(), t);
}
+ } else {
+ LOG.trace("PUT Successful " + identifier);
+ tryPutData.done();
}
-
- return Response.status(Status.OK).build();
}
private static void validateTopLevelNodeName(final NormalizedNodeContext node,
final DOMMountPoint mountPoint = payload.getInstanceIdentifierContext().getMountPoint();
final InstanceIdentifierContext<?> iiWithData = payload.getInstanceIdentifierContext();
final YangInstanceIdentifier normalizedII = iiWithData.getInstanceIdentifier();
- try {
- if (mountPoint != null) {
- this.broker.commitConfigurationDataPost(mountPoint, normalizedII, payload.getData()).checkedGet();
- } else {
- this.broker.commitConfigurationDataPost(this.controllerContext.getGlobalSchema(), normalizedII, payload.getData()).checkedGet();
+
+ CheckedFuture<Void, TransactionCommitFailedException> future;
+ if (mountPoint != null) {
+ future = this.broker.commitConfigurationDataPost(mountPoint, normalizedII, payload.getData());
+ } else {
+ future = this.broker.commitConfigurationDataPost(this.controllerContext.getGlobalSchema(), normalizedII,
+ payload.getData());
+ }
+
+ final CountDownLatch waiter = new CountDownLatch(1);
+ Futures.addCallback(future, new FutureCallback<Void>() {
+
+ @Override
+ public void onSuccess(final Void result) {
+ handlerLoggerPost(null, uriInfo);
+ waiter.countDown();
}
- } catch(final RestconfDocumentedException e) {
- throw e;
- } catch (final Exception e) {
- final String errMsg = "Error creating data ";
- LOG.info(errMsg + (uriInfo != null ? uriInfo.getPath() : ""), e);
- throw new RestconfDocumentedException(errMsg, e);
+
+ @Override
+ public void onFailure(final Throwable t) {
+ waiter.countDown();
+ handlerLoggerPost(t, uriInfo);
+ }
+ });
+
+ try {
+ waiter.await();
+ } catch (final InterruptedException e) {
+ final String msg = "Problem while waiting for response";
+ LOG.warn(msg);
+ throw new RestconfDocumentedException(msg, e);
}
final ResponseBuilder responseBuilder = Response.status(Status.NO_CONTENT);
return responseBuilder.build();
}
+ protected void handlerLoggerPost(final Throwable t, final UriInfo uriInfo) {
+ if (t != null) {
+ final String errMsg = "Error creating data ";
+ LOG.warn(errMsg + (uriInfo != null ? uriInfo.getPath() : ""), t);
+ throw new RestconfDocumentedException(errMsg, t);
+ } else {
+ LOG.trace("Successfuly create data.");
+ }
+ }
+
private URI resolveLocation(final UriInfo uriInfo, final String uriBehindBase, final DOMMountPoint mountPoint, final YangInstanceIdentifier normalizedII) {
if(uriInfo == null) {
// This is null if invoked internally
final DOMMountPoint mountPoint = iiWithData.getMountPoint();
final YangInstanceIdentifier normalizedII = iiWithData.getInstanceIdentifier();
- try {
- if (mountPoint != null) {
- this.broker.commitConfigurationDataDelete(mountPoint, normalizedII);
- } else {
- this.broker.commitConfigurationDataDelete(normalizedII).get();
+ final CheckedFuture<Void, TransactionCommitFailedException> future;
+ if (mountPoint != null) {
+ future = this.broker.commitConfigurationDataDelete(mountPoint, normalizedII);
+ } else {
+ future = this.broker.commitConfigurationDataDelete(normalizedII);
+ }
+
+ final CountDownLatch waiter = new CountDownLatch(1);
+ Futures.addCallback(future, new FutureCallback<Void>() {
+
+ @Override
+ public void onSuccess(final Void result) {
+ handlerLoggerDelete(null);
+ waiter.countDown();
}
- } catch (final Exception e) {
- final Optional<Throwable> searchedException = Iterables.tryFind(Throwables.getCausalChain(e),
- Predicates.instanceOf(ModifiedNodeDoesNotExistException.class));
- if (searchedException.isPresent()) {
- throw new RestconfDocumentedException("Data specified for deleting doesn't exist.", ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
+
+ @Override
+ public void onFailure(final Throwable t) {
+ waiter.countDown();
+ handlerLoggerDelete(t);
}
- final String errMsg = "Error while deleting data";
- LOG.info(errMsg, e);
- throw new RestconfDocumentedException(errMsg, e);
+
+ });
+
+ try {
+ waiter.await();
+ } catch (final InterruptedException e) {
+ final String msg = "Problem while waiting for response";
+ LOG.warn(msg);
+ throw new RestconfDocumentedException(msg, e);
}
+
return Response.status(Status.OK).build();
}
+ protected void handlerLoggerDelete(final Throwable t) {
+ if (t != null) {
+ final String errMsg = "Error while deleting data";
+ LOG.info(errMsg, t);
+ throw new RestconfDocumentedException(errMsg, t);
+ } else {
+ LOG.trace("Successfuly delete data.");
+ }
+ }
+
/**
* Subscribes to some path in schema context (stream) to listen on changes on this stream.
*
*/
@Override
public Response subscribeToStream(final String identifier, final UriInfo uriInfo) {
+ if (identifier.contains(DATA_SUBSCR)) {
+ return dataSubs(identifier, uriInfo);
+ } else if (identifier.contains(NOTIFICATION_STREAM)) {
+ return notifStream(identifier, uriInfo);
+ }
+ final String msg = "Bad type of notification of sal-remote";
+ LOG.warn(msg);
+ throw new RestconfDocumentedException(msg);
+ }
+
+ /**
+ * Register notification listener by stream name
+ *
+ * @param identifier
+ * - stream name
+ * @param uriInfo
+ * - uriInfo
+ * @return {@link Response}
+ */
+ private Response notifStream(final String identifier, final UriInfo uriInfo) {
+ final String streamName = Notificator.createStreamNameFromUri(identifier);
+ if (Strings.isNullOrEmpty(streamName)) {
+ throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
+ }
+ final List<NotificationListenerAdapter> listeners = Notificator.getNotificationListenerFor(streamName);
+ if ((listeners == null) || listeners.isEmpty()) {
+ throw new RestconfDocumentedException("Stream was not found.", ErrorType.PROTOCOL,
+ ErrorTag.UNKNOWN_ELEMENT);
+ }
+
+ for (final NotificationListenerAdapter listener : listeners) {
+ this.broker.registerToListenNotification(listener);
+ }
+
+ final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder();
+ int notificationPort = NOTIFICATION_PORT;
+ try {
+ final WebSocketServer webSocketServerInstance = WebSocketServer.getInstance();
+ notificationPort = webSocketServerInstance.getPort();
+ } catch (final NullPointerException e) {
+ WebSocketServer.createInstance(NOTIFICATION_PORT);
+ }
+ final UriBuilder uriToWebsocketServerBuilder = uriBuilder.port(notificationPort).scheme("ws");
+ final URI uriToWebsocketServer = uriToWebsocketServerBuilder.replacePath(streamName).build();
+
+ return Response.status(Status.OK).location(uriToWebsocketServer).build();
+ }
+
+ /**
+ * Register data change listener by stream name
+ *
+ * @param identifier
+ * - stream name
+ * @param uriInfo
+ * - uri info
+ * @return {@link Response}
+ */
+ private Response dataSubs(final String identifier, final UriInfo uriInfo) {
final String streamName = Notificator.createStreamNameFromUri(identifier);
if (Strings.isNullOrEmpty(streamName)) {
throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
if (context == null) {
throw new RestconfDocumentedException("Input is required.", ErrorType.PROTOCOL, ErrorTag.MALFORMED_MESSAGE);
}
- return this.broker.patchConfigurationDataWithinTransaction(context, this.controllerContext.getGlobalSchema());
+
+ try {
+ return this.broker.patchConfigurationDataWithinTransaction(context,
+ this.controllerContext.getGlobalSchema());
+ } catch (final InterruptedException e) {
+ LOG.debug("Patch transaction failed", e);
+ throw new RestconfDocumentedException(e.getMessage());
+ }
}
@Override
if (context == null) {
throw new RestconfDocumentedException("Input is required.", ErrorType.PROTOCOL, ErrorTag.MALFORMED_MESSAGE);
}
- return this.broker.patchConfigurationDataWithinTransaction(context, this.controllerContext.getGlobalSchema());
+
+ try {
+ return this.broker.patchConfigurationDataWithinTransaction(context,
+ this.controllerContext.getGlobalSchema());
+ } catch (final InterruptedException e) {
+ LOG.debug("Patch transaction failed", e);
+ throw new RestconfDocumentedException(e.getMessage());
+ }
}
/**
* Load parameter for subscribing to stream from input composite node
*
- * @param compNode
+ * @param value
* contains value
* @return enum object if its string value is equal to {@code paramName}. In other cases null.
*/
return result;
}
- public BigInteger getOperationalReceived() {
- // TODO Auto-generated method stub
- return null;
- }
-
private MapNode makeModuleMapNode(final Set<Module> modules) {
Preconditions.checkNotNull(modules);
final Module restconfModule = getRestconfModule();
return streamNodeValues.build();
}
+
+ /**
+ * Prepare stream for notification
+ *
+ * @param payload
+ * - contains list of qnames of notifications
+ * @return - checked future object
+ */
+ private CheckedFuture<DOMRpcResult, DOMRpcException> invokeSalRemoteRpcNotifiStrRPC(
+ final NormalizedNodeContext payload) {
+ final ContainerNode data = (ContainerNode) payload.getData();
+ LeafSetNode leafSet = null;
+ String outputType = "XML";
+ for (final DataContainerChild<? extends PathArgument, ?> dataChild : data.getValue()) {
+ if (dataChild instanceof LeafSetNode) {
+ leafSet = (LeafSetNode) dataChild;
+ } else if (dataChild instanceof AugmentationNode) {
+ outputType = (String) (((AugmentationNode) dataChild).getValue()).iterator().next().getValue();
+ }
+ }
+
+ final Collection<LeafSetEntryNode> entryNodes = leafSet.getValue();
+ final List<SchemaPath> paths = new ArrayList<>();
+ String streamName = CREATE_NOTIFICATION_STREAM + "/";
+
+ final Iterator<LeafSetEntryNode> iterator = entryNodes.iterator();
+ while (iterator.hasNext()) {
+ final QName valueQName = QName.create((String) iterator.next().getValue());
+ final Module module = ControllerContext.getInstance()
+ .findModuleByNamespace(valueQName.getModule().getNamespace());
+ Preconditions.checkNotNull(module, "Module for namespace " + valueQName.getModule().getNamespace()
+ + " does not exist");
+ NotificationDefinition notifiDef = null;
+ for (final NotificationDefinition notification : module.getNotifications()) {
+ if (notification.getQName().equals(valueQName)) {
+ notifiDef = notification;
+ break;
+ }
+ }
+ final String moduleName = module.getName();
+ Preconditions.checkNotNull(notifiDef,
+ "Notification " + valueQName + "doesn't exist in module " + moduleName);
+ paths.add(notifiDef.getPath());
+ streamName = streamName + moduleName + ":" + valueQName.getLocalName();
+ if (iterator.hasNext()) {
+ streamName = streamName + ",";
+ }
+ }
+
+ final QName rpcQName = payload.getInstanceIdentifierContext().getSchemaNode().getQName();
+ final QName outputQname = QName.create(rpcQName, "output");
+ final QName streamNameQname = QName.create(rpcQName, "notification-stream-identifier");
+
+ final ContainerNode output = ImmutableContainerNodeBuilder.create()
+ .withNodeIdentifier(new NodeIdentifier(outputQname))
+ .withChild(ImmutableNodes.leafNode(streamNameQname, streamName)).build();
+
+ if (!Notificator.existNotificationListenerFor(streamName)) {
+ Notificator.createNotificationListener(paths, streamName, outputType);
+ }
+
+ final DOMRpcResult defaultDOMRpcResult = new DefaultDOMRpcResult(output);
+
+ return Futures.immediateCheckedFuture(defaultDOMRpcResult);
+ }
+
+ private class TryOfPutData {
+ int tries = 2;
+ boolean done = false;
+
+ void countDown() {
+ this.tries--;
+ }
+
+ void done() {
+ this.done = true;
+ }
+
+ boolean isDone() {
+ return this.done;
+ }
+ int countGet() {
+ return this.tries;
+ }
+ }
}