* </pre>
*/
// FIXME: this really should be a normal RPC implementation
- static DOMRpcResult createDataChangeNotifiStream(final ContainerNode input,
+ static DOMRpcResult createDataChangeNotifiStream(final ListenersBroker listenersBroker, final ContainerNode input,
final EffectiveModelContext refSchemaCtx) {
// parsing out of container with settings and path
final YangInstanceIdentifier path = preparePath(input);
final String streamName = streamNameBuilder.toString();
// registration of the listener
- ListenersBroker.getInstance().registerDataChangeListener(path, streamName, outputType);
+ listenersBroker.registerDataChangeListener(path, streamName, outputType);
// building of output
return new DefaultDOMRpcResult(Builders.containerBuilder()
ErrorTag.OPERATION_FAILED);
}
- final DeviceNotificationListenerAdaptor notificationListenerAdapter = ListenersBroker.getInstance()
+ final DeviceNotificationListenerAdaptor notificationListenerAdapter = streamUtil.listenersBroker()
.registerDeviceNotificationListener(deviceName, prepareOutputType(input), mountModelContext,
mountPointService, mountPoint.getIdentifier());
notificationListenerAdapter.listen(mountNotifService, notificationPaths);
private final SubscribeToStreamUtil streamUtils;
private final DOMActionService actionService;
private final DOMDataBroker dataBroker;
+ private final ListenersBroker listenersBroker = ListenersBroker.getInstance();
public RestconfDataServiceImpl(final DatabindProvider databindProvider,
final DOMDataBroker dataBroker, final DOMMountPointService mountPointService,
final var notifName = notification.argument();
writeNotificationStreamToDatastore(schemaContext, uriInfo, transaction,
- createYangNotifiStream(moduleName, notifName, NotificationOutputType.XML));
+ createYangNotifiStream(listenersBroker, moduleName, notifName, NotificationOutputType.XML));
writeNotificationStreamToDatastore(schemaContext, uriInfo, transaction,
- createYangNotifiStream(moduleName, notifName, NotificationOutputType.JSON));
+ createYangNotifiStream(listenersBroker, moduleName, notifName, NotificationOutputType.JSON));
});
}
}
}
- private static NotificationListenerAdapter createYangNotifiStream(final String moduleName, final QName notifName,
- final NotificationOutputType outputType) {
+ private static NotificationListenerAdapter createYangNotifiStream(final ListenersBroker listenersBroker,
+ final String moduleName, final QName notifName, final NotificationOutputType outputType) {
final var streamName = createNotificationStreamName(moduleName, notifName.getLocalName(), outputType);
- final var listenersBroker = ListenersBroker.getInstance();
final var existing = listenersBroker.notificationListenerFor(streamName);
return existing != null ? existing
final var mountPoint = context.getMountPoint();
if (mountPoint == null) {
if (CreateDataChangeEventSubscription.QNAME.equals(rpcName)) {
- future = Futures.immediateFuture(CreateStreamUtil.createDataChangeNotifiStream(input, schemaContext));
+ future = Futures.immediateFuture(CreateStreamUtil.createDataChangeNotifiStream(
+ streamUtils.listenersBroker(), input, schemaContext));
} else if (SubscribeDeviceNotification.QNAME.equals(rpcName)) {
final String baseUrl = streamUtils.prepareUriByStreamName(uriInfo, "").toString();
future = Futures.immediateFuture(CreateStreamUtil.createDeviceNotificationListener(baseUrl, input,
package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.util.Objects.requireNonNull;
import com.google.common.base.Splitter;
import java.net.URI;
import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenerAdapter;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
-import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListenerAdapter;
import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
import org.opendaylight.yangtools.yang.common.ErrorTag;
import org.opendaylight.yangtools.yang.common.ErrorType;
* Implementation of SubscribeToStreamUtil for Server-sent events.
*/
private static final class ServerSentEvents extends SubscribeToStreamUtil {
- static final ServerSentEvents INSTANCE = new ServerSentEvents();
+ static final ServerSentEvents INSTANCE = new ServerSentEvents(ListenersBroker.getInstance());
+
+ private ServerSentEvents(final ListenersBroker listenersBroker) {
+ super(listenersBroker);
+ }
@Override
public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
* Implementation of SubscribeToStreamUtil for Web sockets.
*/
private static final class WebSockets extends SubscribeToStreamUtil {
- static final WebSockets INSTANCE = new WebSockets();
+ static final WebSockets INSTANCE = new WebSockets(ListenersBroker.getInstance());
+
+ private WebSockets(final ListenersBroker listenersBroker) {
+ super(listenersBroker);
+ }
@Override
public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
private static final Logger LOG = LoggerFactory.getLogger(SubscribeToStreamUtil.class);
private static final Splitter SLASH_SPLITTER = Splitter.on('/');
- SubscribeToStreamUtil() {
- // Hidden on purpose
+ private final @NonNull ListenersBroker listenersBroker;
+
+ SubscribeToStreamUtil(final ListenersBroker listenersBroker) {
+ this.listenersBroker = requireNonNull(listenersBroker);
}
static SubscribeToStreamUtil serverSentEvents() {
return WebSockets.INSTANCE;
}
+ public final @NonNull ListenersBroker listenersBroker() {
+ return listenersBroker;
+ }
+
/**
* Prepare URL from base name and stream name.
*
throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
}
- final NotificationListenerAdapter notificationListenerAdapter = ListenersBroker.getInstance()
- .notificationListenerFor(streamName);
+ final var notificationListenerAdapter = listenersBroker.notificationListenerFor(streamName);
if (notificationListenerAdapter == null) {
throw new RestconfDocumentedException(String.format("Stream with name %s was not found.", streamName),
ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT);
}
- final EffectiveModelContext schemaContext = handlersHolder.getDatabindProvider().currentContext()
- .modelContext();
+ final var schemaContext = handlersHolder.getDatabindProvider().currentContext().modelContext();
final URI uri = prepareUriByStreamName(uriInfo, streamName);
notificationListenerAdapter.setQueryParams(notificationQueryParams);
notificationListenerAdapter.listen(handlersHolder.getNotificationServiceHandler());
}
final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
- final ListenerAdapter listener = ListenersBroker.getInstance().dataChangeListenerFor(streamName);
+ final ListenerAdapter listener = listenersBroker.dataChangeListenerFor(streamName);
if (listener == null) {
throw new RestconfDocumentedException("No listener found for stream " + streamName,
ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
private final @NonNull EffectiveModelContext effectiveModel;
private final @NonNull DOMMountPointService mountPointService;
private final @NonNull YangInstanceIdentifier instanceIdentifier;
+ private final @NonNull ListenersBroker listenersBroker = ListenersBroker.getInstance();
private ListenerRegistration<DOMMountPointListener> reg;
}
}
});
- ListenersBroker.getInstance().removeAndCloseDeviceNotificationListener(this);
+ listenersBroker.removeAndCloseDeviceNotificationListener(this);
resetListenerRegistration();
}
}
import org.mockito.junit.MockitoJUnitRunner;
import org.opendaylight.mdsal.dom.api.DOMRpcResult;
import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
import org.opendaylight.yangtools.yang.common.ErrorTag;
import org.opendaylight.yangtools.yang.common.ErrorType;
import org.opendaylight.yangtools.yang.common.QName;
public class CreateStreamUtilTest {
private static EffectiveModelContext SCHEMA_CTX;
+ private final ListenersBroker listenersBroker = ListenersBroker.getInstance();
+
@BeforeClass
public static void setUp() {
SCHEMA_CTX = YangParserTestUtils.parseYangResourceDirectory("/streams");
@Test
public void createStreamTest() {
- final DOMRpcResult result = CreateStreamUtil.createDataChangeNotifiStream(
+ final DOMRpcResult result = CreateStreamUtil.createDataChangeNotifiStream(listenersBroker,
prepareDomPayload("create-data-change-event-subscription", RpcDefinition::getInput, "toaster", "path"),
SCHEMA_CTX);
assertEquals(List.of(), result.errors());
final var payload = prepareDomPayload("create-data-change-event-subscription", RpcDefinition::getInput,
"String value", "path");
final var errors = assertThrows(RestconfDocumentedException.class,
- () -> CreateStreamUtil.createDataChangeNotifiStream(payload, SCHEMA_CTX)).getErrors();
+ () -> CreateStreamUtil.createDataChangeNotifiStream(listenersBroker, payload, SCHEMA_CTX)).getErrors();
assertEquals(1, errors.size());
final var error = errors.get(0);
assertEquals(ErrorType.APPLICATION, error.getErrorType());
final var payload = prepareDomPayload("create-data-change-event-subscription2", RpcDefinition::getInput,
"toaster", "path2");
final var errors = assertThrows(RestconfDocumentedException.class,
- () -> CreateStreamUtil.createDataChangeNotifiStream(payload, SCHEMA_CTX)).getErrors();
+ () -> CreateStreamUtil.createDataChangeNotifiStream(listenersBroker, payload, SCHEMA_CTX)).getErrors();
assertEquals(1, errors.size());
final var error = errors.get(0);
assertEquals(ErrorType.APPLICATION, error.getErrorType());
+ "toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE";
private static EffectiveModelContext MODEL_CONTEXT;
+ // FIXME: NETCONF-1104: this should be non-static and set up for each test separately
+ private static ListenersBroker LISTENERS_BROKER;
@Mock
private DOMDataBroker dataBroker;
@BeforeClass
public static void beforeClass() {
MODEL_CONTEXT = YangParserTestUtils.parseYangResourceDirectory("/notifications");
+
+ final String name =
+ "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE";
+ final ListenerAdapter adapter = new ListenerAdapter(YangInstanceIdentifier.of(
+ QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toaster")),
+ name, NotificationOutputType.JSON);
+ LISTENERS_BROKER = ListenersBroker.getInstance();
+ LISTENERS_BROKER.setDataChangeListeners(Map.of(name, adapter));
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ if (LISTENERS_BROKER != null) {
+ LISTENERS_BROKER.setDataChangeListeners(Map.of());
+ LISTENERS_BROKER = null;
+ }
}
@Before
configurationSse = new StreamsConfiguration(0, 100, 10, true);
}
- @BeforeClass
- public static void setUpBeforeTest() {
- final String name =
- "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE";
- final ListenerAdapter adapter = new ListenerAdapter(YangInstanceIdentifier.of(
- QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toaster")),
- name, NotificationOutputType.JSON);
- ListenersBroker.getInstance().setDataChangeListeners(Map.of(name, adapter));
- }
-
- @AfterClass
- public static void setUpAfterTest() {
- ListenersBroker.getInstance().setDataChangeListeners(Map.of());
- }
-
@Test
public void testSubscribeToStreamSSE() {
- ListenersBroker.getInstance().registerDataChangeListener(
+ LISTENERS_BROKER.registerDataChangeListener(
IdentifierCodec.deserialize("toaster:toaster/toasterStatus", MODEL_CONTEXT),
"data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE",
NotificationOutputType.XML);
@Test
public void testSubscribeToStreamWS() {
- ListenersBroker.getInstance().registerDataChangeListener(
+ LISTENERS_BROKER.registerDataChangeListener(
IdentifierCodec.deserialize("toaster:toaster/toasterStatus", MODEL_CONTEXT),
"data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE",
NotificationOutputType.XML);
public class JsonNotificationListenerTest extends AbstractNotificationListenerTest {
private static final Logger LOG = LoggerFactory.getLogger(JsonNotificationListenerTest.class);
+ private final ListenersBroker listenersBroker = ListenersBroker.getInstance();
+
@Test
public void notifi_leafTest() throws Exception {
final Absolute schemaPathNotifi = Absolute.of(QName.create(MODULE, "notifi-leaf"));
return ImmutableNodes.leafNode(leafQName, "value");
}
- private static String prepareJson(final DOMNotification notificationData, final Absolute schemaPathNotifi)
+ private String prepareJson(final DOMNotification notificationData, final Absolute schemaPathNotifi)
throws Exception {
- final var notifiAdapter = ListenersBroker.getInstance().registerNotificationListener(
- schemaPathNotifi, "json-stream", NotificationOutputType.JSON);
+ final var notifiAdapter = listenersBroker.registerNotificationListener(schemaPathNotifi, "json-stream",
+ NotificationOutputType.JSON);
return notifiAdapter.formatter().eventData(SCHEMA_CONTEXT, notificationData, Instant.now()).orElseThrow();
}
}
@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class XmlNotificationListenerTest extends AbstractNotificationListenerTest {
+ private final ListenersBroker listenersBroker = ListenersBroker.getInstance();
+
@Test
public void notifi_leafTest() throws Exception {
final Absolute schemaPathNotifi = Absolute.of(QName.create(MODULE, "notifi-leaf"));
return ImmutableNodes.leafNode(leafQName, "value");
}
- private static String prepareXmlResult(final DOMNotification notificationData, final Absolute schemaPathNotifi)
+ private String prepareXmlResult(final DOMNotification notificationData, final Absolute schemaPathNotifi)
throws Exception {
- final var notifiAdapter = ListenersBroker.getInstance().registerNotificationListener(
- schemaPathNotifi, "xml-stream", NotificationOutputType.XML);
+ final var notifiAdapter = listenersBroker.registerNotificationListener(schemaPathNotifi, "xml-stream",
+ NotificationOutputType.XML);
return notifiAdapter.formatter().eventData(SCHEMA_CONTEXT, notificationData, Instant.now()).orElseThrow();
}
}