*/
package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
-import static java.util.Objects.requireNonNull;
-
import com.google.common.collect.ImmutableSet;
-import java.util.Set;
-import java.util.stream.Collectors;
import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMMountPoint;
import org.opendaylight.mdsal.dom.api.DOMSchemaService;
import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
-import org.opendaylight.restconf.nb.rfc8040.streams.listeners.DeviceNotificationListenerAdaptor;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
-import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionInput;
// FIXME: this really should be a normal RPC implementation
static ContainerNode createDataChangeNotifiStream(final ListenersBroker listenersBroker, final ContainerNode input,
final EffectiveModelContext refSchemaCtx) {
- // parsing out of container with settings and path
- final YangInstanceIdentifier path = preparePath(input);
-
- // building of stream name
- final StringBuilder streamNameBuilder = new StringBuilder(
- prepareDataChangeNotifiStreamName(path, requireNonNull(refSchemaCtx), input));
- final NotificationOutputType outputType = prepareOutputType(input);
- if (outputType.equals(NotificationOutputType.JSON)) {
- streamNameBuilder.append('/').append(outputType.getName());
- }
- final String streamName = streamNameBuilder.toString();
-
- // registration of the listener
- listenersBroker.registerDataChangeListener(path, streamName, outputType);
+ final var datastoreName = extractStringLeaf(input, DATASTORE_NODEID);
+ final var scopeName = extractStringLeaf(input, SCOPE_NODEID);
+ final var adapter = listenersBroker.registerDataChangeListener(refSchemaCtx,
+ datastoreName != null ? LogicalDatastoreType.valueOf(datastoreName) : LogicalDatastoreType.CONFIGURATION,
+ preparePath(input), scopeName != null ? Scope.ofName(scopeName) : Scope.BASE, prepareOutputType(input));
// building of output
return Builders.containerBuilder()
.withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
- .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, streamName))
+ .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, adapter.getStreamName()))
.build();
}
.sorted()
.collect(ImmutableSet.toImmutableSet());
- final var streamNameBuilder = new StringBuilder(RestconfStreamsConstants.NOTIFICATION_STREAM).append('/');
- var haveFirst = false;
for (var qname : qnames) {
- final var module = refSchemaCtx.findModuleStatement(qname.getModule())
- .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an unknown module",
- ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
- final var stmt = module.findSchemaTreeNode(qname)
- .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an notification",
- ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
- if (!(stmt instanceof NotificationEffectiveStatement)) {
- throw new RestconfDocumentedException(qname + " refers to a non-notification",
+ if (refSchemaCtx.findNotification(qname).isEmpty()) {
+ throw new RestconfDocumentedException(qname + " refers to an unknown notification",
ErrorType.APPLICATION, ErrorTag.INVALID_VALUE);
}
-
- if (haveFirst) {
- streamNameBuilder.append(',');
- } else {
- haveFirst = true;
- }
- streamNameBuilder.append(module.argument().getLocalName()).append(':').append(qname.getLocalName());
- }
- final var outputType = prepareOutputType(input);
- if (outputType.equals(NotificationOutputType.JSON)) {
- streamNameBuilder.append('/').append(outputType.getName());
}
- final var streamName = streamNameBuilder.toString();
-
// registration of the listener
- listenersBroker.registerNotificationListener(qnames, streamName, outputType);
+ final var adapter = listenersBroker.registerNotificationListener(refSchemaCtx, qnames,
+ prepareOutputType(input));
return Builders.containerBuilder()
.withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
- .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, streamName))
+ .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, adapter.getStreamName()))
.build();
}
final ListenersBroker listenersBroker) {
// parsing out of container with settings and path
// FIXME: ugly cast
- final YangInstanceIdentifier path =
- (YangInstanceIdentifier) input.findChildByArg(DEVICE_NOTIFICATION_PATH_NODEID)
+ final var path = (YangInstanceIdentifier) input.findChildByArg(DEVICE_NOTIFICATION_PATH_NODEID)
.map(DataContainerChild::body)
.orElseThrow(() -> new RestconfDocumentedException("No path specified", ErrorType.APPLICATION,
ErrorTag.DATA_MISSING));
.orElseThrow(() -> new RestconfDocumentedException("Mount point does not support notifications",
ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED));
- final EffectiveModelContext mountModelContext = mountPoint.getService(DOMSchemaService.class)
+ final var mountModelContext = mountPoint.getService(DOMSchemaService.class)
.orElseThrow(() -> new RestconfDocumentedException("Mount point schema not available",
ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED))
.getGlobalContext();
- final Set<Absolute> notificationPaths = mountModelContext.getModuleStatements().values().stream()
+ final var notificationPaths = mountModelContext.getModuleStatements().values().stream()
.flatMap(module -> module.streamEffectiveSubstatements(NotificationEffectiveStatement.class))
.map(notification -> Absolute.of(notification.argument()))
- .collect(Collectors.toUnmodifiableSet());
+ .collect(ImmutableSet.toImmutableSet());
if (notificationPaths.isEmpty()) {
throw new RestconfDocumentedException("Device does not support notification", ErrorType.APPLICATION,
ErrorTag.OPERATION_FAILED);
}
- final DeviceNotificationListenerAdaptor notificationListenerAdapter = listenersBroker
- .registerDeviceNotificationListener(deviceName, prepareOutputType(input), mountModelContext,
- mountPointService, mountPoint.getIdentifier());
+ final var notificationListenerAdapter = listenersBroker.registerDeviceNotificationListener(deviceName,
+ prepareOutputType(input), mountModelContext, mountPointService, mountPoint.getIdentifier());
notificationListenerAdapter.listen(mountNotifService, notificationPaths);
return Builders.containerBuilder()
.withNodeIdentifier(new NodeIdentifier(SubscribeDeviceNotificationOutput.QNAME))
- .withChild(ImmutableNodes.leafNode(DEVICE_NOTIFICATION_STREAM_PATH, baseUrl + deviceName + "?"
- + RestconfStreamsConstants.NOTIFICATION_TYPE + "=" + RestconfStreamsConstants.DEVICE))
+ .withChild(ImmutableNodes.leafNode(DEVICE_NOTIFICATION_STREAM_PATH,
+ baseUrl + notificationListenerAdapter.getStreamName() + "?"
+ + RestconfStreamsConstants.NOTIFICATION_TYPE + "=" + RestconfStreamsConstants.DEVICE))
.build();
}
return outputName != null ? NotificationOutputType.valueOf(outputName) : NotificationOutputType.XML;
}
- /**
- * Prepare stream name.
- *
- * @param path Path of element from which data-change-event notifications are going to be generated.
- * @param schemaContext Schema context.
- * @param data Container with stream settings (RPC create-stream).
- * @return Parsed stream name.
- */
- private static String prepareDataChangeNotifiStreamName(final YangInstanceIdentifier path,
- final EffectiveModelContext schemaContext, final ContainerNode data) {
- final String datastoreName = extractStringLeaf(data, DATASTORE_NODEID);
- final LogicalDatastoreType datastoreType = datastoreName != null ? LogicalDatastoreType.valueOf(datastoreName)
- : LogicalDatastoreType.CONFIGURATION;
-
- final String scopeName = extractStringLeaf(data, SCOPE_NODEID);
- // FIXME: this is not really used
- final Scope scope = scopeName != null ? Scope.ofName(scopeName) : Scope.BASE;
-
- return RestconfStreamsConstants.DATA_SUBSCRIPTION
- + "/" + ListenersBroker.createStreamNameFromUri(IdentifierCodec.serialize(path, schemaContext)
- + "/" + RestconfStreamsConstants.DATASTORE_PARAM_NAME + "=" + datastoreType
- + "/" + RestconfStreamsConstants.SCOPE_PARAM_NAME + "=" + scope);
- }
-
/**
* Prepare {@link YangInstanceIdentifier} of stream source.
*
import javax.inject.Inject;
import javax.inject.Singleton;
import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMMountPointService;
+import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
import org.opendaylight.restconf.nb.rfc8040.URLConstants;
import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
+import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
+import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1.Scope;
import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
+import org.opendaylight.yangtools.yang.common.ErrorTag;
+import org.opendaylight.yangtools.yang.common.ErrorType;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
+import org.opendaylight.yangtools.yang.model.api.stmt.NotificationEffectiveStatement;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.slf4j.Logger;
* hasn't been created yet.
*
* @param path Path to data in data repository.
- * @param streamName Stream name.
* @param outputType Specific type of output for notifications - XML or JSON.
* @return Created or existing data-change listener adapter.
*/
- public ListenerAdapter registerDataChangeListener(final YangInstanceIdentifier path, final String streamName,
+ public ListenerAdapter registerDataChangeListener(final EffectiveModelContext modelContext,
+ final LogicalDatastoreType datastore, final YangInstanceIdentifier path, final Scope scope,
final NotificationOutputType outputType) {
- requireNonNull(path);
- requireNonNull(streamName);
- requireNonNull(outputType);
+ final var sb = new StringBuilder(RestconfStreamsConstants.DATA_SUBSCRIPTION)
+ .append('/').append(createStreamNameFromUri(IdentifierCodec.serialize(path, modelContext)))
+ .append('/').append(RestconfStreamsConstants.DATASTORE_PARAM_NAME).append('=').append(datastore)
+ .append('/').append(RestconfStreamsConstants.SCOPE_PARAM_NAME).append('=').append(scope);
+ if (outputType != NotificationOutputType.XML) {
+ sb.append('/').append(outputType.getName());
+ }
final long stamp = dataChangeListenersLock.writeLock();
try {
- return dataChangeListeners.computeIfAbsent(streamName,
- stream -> new ListenerAdapter(path, stream, outputType, this));
+ return dataChangeListeners.computeIfAbsent(sb.toString(),
+ streamName -> new ListenerAdapter(path, streamName, outputType, this));
} finally {
dataChangeListenersLock.unlockWrite(stamp);
}
* Creates new {@link NotificationDefinition} listener using input stream name and schema path
* if such listener haven't been created yet.
*
+ * @param refSchemaCtx reference {@link EffectiveModelContext}
* @param notifications {@link QName}s of accepted YANG notifications
- * @param streamName Stream name.
* @param outputType Specific type of output for notifications - XML or JSON.
* @return Created or existing notification listener adapter.
*/
- public NotificationListenerAdapter registerNotificationListener(final ImmutableSet<QName> notifications,
- final String streamName, final NotificationOutputType outputType) {
- requireNonNull(notifications);
- requireNonNull(streamName);
- requireNonNull(outputType);
+ public NotificationListenerAdapter registerNotificationListener(final EffectiveModelContext refSchemaCtx,
+ final ImmutableSet<QName> notifications, final NotificationOutputType outputType) {
+ final var sb = new StringBuilder(RestconfStreamsConstants.NOTIFICATION_STREAM).append('/');
+ var haveFirst = false;
+ for (var qname : notifications) {
+ final var module = refSchemaCtx.findModuleStatement(qname.getModule())
+ .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an unknown module",
+ ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
+ final var stmt = module.findSchemaTreeNode(qname)
+ .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an notification",
+ ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
+ if (!(stmt instanceof NotificationEffectiveStatement)) {
+ throw new RestconfDocumentedException(qname + " refers to a non-notification",
+ ErrorType.APPLICATION, ErrorTag.INVALID_VALUE);
+ }
+
+ if (haveFirst) {
+ sb.append(',');
+ } else {
+ haveFirst = true;
+ }
+ sb.append(module.argument().getLocalName()).append(':').append(qname.getLocalName());
+ }
+ if (outputType != NotificationOutputType.XML) {
+ sb.append('/').append(outputType.getName());
+ }
final long stamp = notificationListenersLock.writeLock();
try {
- return notificationListeners.computeIfAbsent(streamName,
- stream -> new NotificationListenerAdapter(notifications, stream, outputType, this));
+ return notificationListeners.computeIfAbsent(sb.toString(),
+ streamName -> new NotificationListenerAdapter(notifications, streamName, outputType, this));
} finally {
notificationListenersLock.unlockWrite(stamp);
}
* Creates new {@link DeviceNotificationListenerAdaptor} listener using input stream name and schema path
* if such listener haven't been created yet.
*
- * @param streamName Stream name.
+ * @param deviceName Device name.
* @param outputType Specific type of output for notifications - XML or JSON.
* @param refSchemaCtx Schema context of node
* @param mountPointService Mount point service
* @return Created or existing device notification listener adapter.
*/
- public DeviceNotificationListenerAdaptor registerDeviceNotificationListener(final String streamName,
- final NotificationOutputType outputType, final EffectiveModelContext refSchemaCtx,
- final DOMMountPointService mountPointService, final YangInstanceIdentifier path) {
-
+ public DeviceNotificationListenerAdaptor registerDeviceNotificationListener(final String deviceName,
+ final NotificationOutputType outputType, final EffectiveModelContext refSchemaCtx,
+ final DOMMountPointService mountPointService, final YangInstanceIdentifier path) {
final long stamp = deviceNotificationListenersLock.writeLock();
try {
- return deviceNotificationListeners.computeIfAbsent(streamName,
- stream -> new DeviceNotificationListenerAdaptor(streamName, outputType, refSchemaCtx,
+ return deviceNotificationListeners.computeIfAbsent(deviceName,
+ streamName -> new DeviceNotificationListenerAdaptor(deviceName, outputType, refSchemaCtx,
mountPointService, path, this));
} finally {
deviceNotificationListenersLock.unlockWrite(stamp);
}
return result;
}
-}
\ No newline at end of file
+}
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.opendaylight.restconf.nb.rfc8040.streams.listeners.AbstractNotificationListenerTest.MODEL_CONTEXT;
import com.google.common.collect.ImmutableClassToInstanceMap;
import java.net.URI;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataBroker;
import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
import org.opendaylight.restconf.nb.rfc8040.databind.DatabindContext;
import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.AbstractNotificationListenerTest;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
+import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1.Scope;
import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.common.ErrorTag;
import org.opendaylight.yangtools.yang.common.ErrorType;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
-import org.opendaylight.yangtools.yang.test.util.YangParserTestUtils;
@RunWith(MockitoJUnitRunner.StrictStubs.class)
-public class RestconfStreamsSubscriptionServiceImplTest {
+public class RestconfStreamsSubscriptionServiceImplTest extends AbstractNotificationListenerTest {
private static final String URI = "/rests/data/ietf-restconf-monitoring:restconf-state/streams/stream/"
+ "toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE";
- private static EffectiveModelContext MODEL_CONTEXT;
-
@Mock
private DOMDataBroker dataBroker;
@Mock
private StreamsConfiguration configurationSse;
private DatabindProvider databindProvider;
- @BeforeClass
- public static void beforeClass() {
- MODEL_CONTEXT = YangParserTestUtils.parseYangResourceDirectory("/notifications");
- }
@Before
public void setUp() throws URISyntaxException {
- final var name = "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE";
- listenersBroker.registerDataChangeListener(
+ listenersBroker.registerDataChangeListener(MODEL_CONTEXT, LogicalDatastoreType.OPERATIONAL,
YangInstanceIdentifier.of(QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toaster")),
- name, NotificationOutputType.JSON);
+ Scope.ONE, NotificationOutputType.JSON);
final DOMDataTreeWriteTransaction wTx = mock(DOMDataTreeWriteTransaction.class);
doReturn(wTx).when(dataBroker).newWriteOnlyTransaction();
doReturn(CommitInfo.emptyFluentFuture()).when(wTx).commit();
@Test
public void testSubscribeToStreamSSE() {
- listenersBroker.registerDataChangeListener(
- IdentifierCodec.deserialize("toaster:toaster/toasterStatus", MODEL_CONTEXT),
- "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE",
- NotificationOutputType.XML);
+ listenersBroker.registerDataChangeListener(MODEL_CONTEXT, LogicalDatastoreType.OPERATIONAL,
+ IdentifierCodec.deserialize("toaster:toaster/toasterStatus", MODEL_CONTEXT), Scope.ONE,
+ NotificationOutputType.XML);
final var streamsSubscriptionService = new RestconfStreamsSubscriptionServiceImpl(dataBroker,
notificationService, databindProvider, listenersBroker, configurationSse);
final var response = streamsSubscriptionService.subscribeToStream(
@Test
public void testSubscribeToStreamWS() {
- listenersBroker.registerDataChangeListener(
- IdentifierCodec.deserialize("toaster:toaster/toasterStatus", MODEL_CONTEXT),
- "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE",
- NotificationOutputType.XML);
+ listenersBroker.registerDataChangeListener(MODEL_CONTEXT, LogicalDatastoreType.OPERATIONAL,
+ IdentifierCodec.deserialize("toaster:toaster/toasterStatus", MODEL_CONTEXT), Scope.ONE,
+ NotificationOutputType.XML);
final var streamsSubscriptionService = new RestconfStreamsSubscriptionServiceImpl(dataBroker,
notificationService, databindProvider, listenersBroker, configurationWs);
final var response = streamsSubscriptionService.subscribeToStream(
@Test
public void testSubscribeToStreamMissingDatastoreInPath() {
- final RestconfStreamsSubscriptionServiceImpl streamsSubscriptionService =
- new RestconfStreamsSubscriptionServiceImpl(dataBroker, notificationService, databindProvider,
- listenersBroker, configurationWs);
+ final var streamsSubscriptionService = new RestconfStreamsSubscriptionServiceImpl(dataBroker,
+ notificationService, databindProvider, listenersBroker, configurationWs);
final var errors = assertThrows(RestconfDocumentedException.class,
() -> streamsSubscriptionService.subscribeToStream("toaster:toaster/toasterStatus/scope=ONE", uriInfo))
.getErrors();
@Test
public void testSubscribeToStreamMissingScopeInPath() {
- final RestconfStreamsSubscriptionServiceImpl streamsSubscriptionService =
- new RestconfStreamsSubscriptionServiceImpl(dataBroker, notificationService, databindProvider,
- listenersBroker, configurationWs);
+ final var streamsSubscriptionService = new RestconfStreamsSubscriptionServiceImpl(dataBroker,
+ notificationService, databindProvider, listenersBroker, configurationWs);
final var errors = assertThrows(RestconfDocumentedException.class,
() -> streamsSubscriptionService.subscribeToStream("toaster:toaster/toasterStatus/datastore=OPERATIONAL",
uriInfo)).getErrors();
*/
package org.opendaylight.restconf.nb.rfc8040.streams;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertNull;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import java.net.URI;
import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.restconf.nb.rfc8040.streams.WebSocketInitializer.WebSocketFactory;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.AbstractNotificationListenerTest;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
+import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1.Scope;
import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-public class WebSocketFactoryTest {
+@ExtendWith(MockitoExtension.class)
+class WebSocketFactoryTest extends AbstractNotificationListenerTest {
private static final String REGISTERED_STREAM_NAME = "data-change-event-subscription/"
- + "toaster:toaster/datastore=CONFIGURATION/scope=SUBTREE";
- private static final YangInstanceIdentifier TOASTER_YIID = YangInstanceIdentifier.builder()
- .node(QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toaster"))
- .build();
+ + "toaster:toaster/datastore=CONFIGURATION/scope=SUBTREE/JSON";
private final ListenersBroker listenersBroker = new ListenersBroker();
- private final WebSocketFactory webSocketFactory = new WebSocketFactory(mock(ScheduledExecutorService.class),
- listenersBroker, 5000, 2000);
+ @Mock
+ private ScheduledExecutorService execService;
+ @Mock
+ private ServletUpgradeRequest upgradeRequest;
+ @Mock
+ private ServletUpgradeResponse upgradeResponse;
- @Before
- public void prepareListenersBroker() {
- listenersBroker.registerDataChangeListener(TOASTER_YIID, REGISTERED_STREAM_NAME,
- NotificationOutputTypeGrouping.NotificationOutputType.JSON);
+ private WebSocketFactory webSocketFactory;
+
+ @BeforeEach
+ void prepareListenersBroker() {
+ webSocketFactory = new WebSocketFactory(execService, listenersBroker, 5000, 2000);
+
+ listenersBroker.registerDataChangeListener(MODEL_CONTEXT, LogicalDatastoreType.CONFIGURATION,
+ YangInstanceIdentifier.of(QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toaster")),
+ Scope.SUBTREE, NotificationOutputTypeGrouping.NotificationOutputType.JSON);
}
@Test
- public void createWebSocketSuccessfully() {
- final ServletUpgradeRequest upgradeRequest = mock(ServletUpgradeRequest.class);
- final ServletUpgradeResponse upgradeResponse = mock(ServletUpgradeResponse.class);
+ void createWebSocketSuccessfully() {
doReturn(URI.create('/' + REGISTERED_STREAM_NAME + '/')).when(upgradeRequest).getRequestURI();
- final Object webSocket = webSocketFactory.createWebSocket(upgradeRequest, upgradeResponse);
- assertThat(webSocket, instanceOf(WebSocketSessionHandler.class));
+ assertInstanceOf(WebSocketSessionHandler.class,
+ webSocketFactory.createWebSocket(upgradeRequest, upgradeResponse));
verify(upgradeResponse).setSuccess(true);
verify(upgradeResponse).setStatusCode(101);
}
@Test
- public void createWebSocketUnsuccessfully() {
- final ServletUpgradeRequest upgradeRequest = mock(ServletUpgradeRequest.class);
- final ServletUpgradeResponse upgradeResponse = mock(ServletUpgradeResponse.class);
+ void createWebSocketUnsuccessfully() {
doReturn(URI.create('/' + REGISTERED_STREAM_NAME + '/' + "toasterStatus"))
.when(upgradeRequest).getRequestURI();
- final Object webSocket = webSocketFactory.createWebSocket(upgradeRequest, upgradeResponse);
- assertNull(webSocket);
+ assertNull(webSocketFactory.createWebSocket(upgradeRequest, upgradeResponse));
verify(upgradeResponse).setSuccess(false);
verify(upgradeResponse).setStatusCode(404);
}
*/
package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
import org.opendaylight.yangtools.yang.common.QNameModule;
import org.opendaylight.yangtools.yang.common.Revision;
import org.opendaylight.yangtools.yang.common.XMLNamespace;
import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
import org.opendaylight.yangtools.yang.test.util.YangParserTestUtils;
-abstract class AbstractNotificationListenerTest {
+public abstract class AbstractNotificationListenerTest {
static final QNameModule MODULE = QNameModule.create(XMLNamespace.of("notifi:mod"), Revision.of("2016-11-23"));
- static EffectiveModelContext SCHEMA_CONTEXT;
-
- @BeforeClass
- public static final void beforeClass() {
- SCHEMA_CONTEXT = YangParserTestUtils.parseYangResourceDirectory("/notifications");
- }
-
- @AfterClass
- public static final void afterClass() {
- SCHEMA_CONTEXT = null;
- }
+ public static final EffectiveModelContext MODEL_CONTEXT =
+ YangParserTestUtils.parseYangResourceDirectory("/notifications");
}
private String prepareJson(final DOMNotification notificationData, final QName schemaPathNotifi)
throws Exception {
- final var notifiAdapter = listenersBroker.registerNotificationListener(ImmutableSet.of(schemaPathNotifi),
- "json-stream", NotificationOutputType.JSON);
- return notifiAdapter.formatter().eventData(SCHEMA_CONTEXT, notificationData, Instant.now()).orElseThrow();
+ final var notifiAdapter = listenersBroker.registerNotificationListener(MODEL_CONTEXT,
+ ImmutableSet.of(schemaPathNotifi), NotificationOutputType.JSON);
+ return notifiAdapter.formatter().eventData(MODEL_CONTEXT, notificationData, Instant.now()).orElseThrow();
}
}
private String prepareXmlResult(final DOMNotification notificationData, final QName schemaPathNotifi)
throws Exception {
- final var notifiAdapter = listenersBroker.registerNotificationListener(ImmutableSet.of(schemaPathNotifi),
- "xml-stream", NotificationOutputType.XML);
- return notifiAdapter.formatter().eventData(SCHEMA_CONTEXT, notificationData, Instant.now()).orElseThrow();
+ final var notifiAdapter = listenersBroker.registerNotificationListener(MODEL_CONTEXT,
+ ImmutableSet.of(schemaPathNotifi), NotificationOutputType.XML);
+ return notifiAdapter.formatter().eventData(MODEL_CONTEXT, notificationData, Instant.now()).orElseThrow();
}
}