+
+ /**
+ * Prepare stream for notification
+ *
+ * @param payload
+ * - contains list of qnames of notifications
+ * @return - checked future object
+ */
+ private CheckedFuture<DOMRpcResult, DOMRpcException> invokeSalRemoteRpcNotifiStrRPC(
+ final NormalizedNodeContext payload) {
+ final ContainerNode data = (ContainerNode) payload.getData();
+ LeafSetNode leafSet = null;
+ String outputType = "XML";
+ for (final DataContainerChild<? extends PathArgument, ?> dataChild : data.getValue()) {
+ if (dataChild instanceof LeafSetNode) {
+ leafSet = (LeafSetNode) dataChild;
+ } else if (dataChild instanceof AugmentationNode) {
+ outputType = (String) (((AugmentationNode) dataChild).getValue()).iterator().next().getValue();
+ }
+ }
+
+ final Collection<LeafSetEntryNode> entryNodes = leafSet.getValue();
+ final List<SchemaPath> paths = new ArrayList<>();
+ String streamName = CREATE_NOTIFICATION_STREAM + "/";
+
+ final Iterator<LeafSetEntryNode> iterator = entryNodes.iterator();
+ while (iterator.hasNext()) {
+ final QName valueQName = QName.create((String) iterator.next().getValue());
+ final Module module = ControllerContext.getInstance()
+ .findModuleByNamespace(valueQName.getModule().getNamespace());
+ Preconditions.checkNotNull(module, "Module for namespace " + valueQName.getModule().getNamespace()
+ + " does not exist");
+ NotificationDefinition notifiDef = null;
+ for (final NotificationDefinition notification : module.getNotifications()) {
+ if (notification.getQName().equals(valueQName)) {
+ notifiDef = notification;
+ break;
+ }
+ }
+ final String moduleName = module.getName();
+ Preconditions.checkNotNull(notifiDef,
+ "Notification " + valueQName + "doesn't exist in module " + moduleName);
+ paths.add(notifiDef.getPath());
+ streamName = streamName + moduleName + ":" + valueQName.getLocalName();
+ if (iterator.hasNext()) {
+ streamName = streamName + ",";
+ }
+ }
+
+ final QName rpcQName = payload.getInstanceIdentifierContext().getSchemaNode().getQName();
+ final QName outputQname = QName.create(rpcQName, "output");
+ final QName streamNameQname = QName.create(rpcQName, "notification-stream-identifier");
+
+ final ContainerNode output = ImmutableContainerNodeBuilder.create()
+ .withNodeIdentifier(new NodeIdentifier(outputQname))
+ .withChild(ImmutableNodes.leafNode(streamNameQname, streamName)).build();
+
+ if (!Notificator.existNotificationListenerFor(streamName)) {
+ Notificator.createNotificationListener(paths, streamName, outputType);
+ }
+
+ final DOMRpcResult defaultDOMRpcResult = new DefaultDOMRpcResult(output);
+
+ return Futures.immediateCheckedFuture(defaultDOMRpcResult);
+ }
+
+ private class TryOfPutData {
+ int tries = 2;
+ boolean done = false;
+
+ void countDown() {
+ this.tries--;
+ }
+
+ void done() {
+ this.done = true;
+ }
+
+ boolean isDone() {
+ return this.done;
+ }
+ int countGet() {
+ return this.tries;
+ }
+ }