Initial message bus implementation
[controller.git] / opendaylight / md-sal / messagebus-impl / src / main / java / org / opendaylight / controller / messagebus / app / impl / EventAggregator.java
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventAggregator.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventAggregator.java
new file mode 100644 (file)
index 0000000..4b77bf2
--- /dev/null
@@ -0,0 +1,75 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.messagebus.app.impl;
+
+import java.util.List;
+import java.util.concurrent.Future;
+import org.opendaylight.controller.mdsal.MdSAL;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO: implement topic created notification
+public class EventAggregator implements EventAggregatorService {
+    private static final Logger LOGGER = LoggerFactory.getLogger(EventAggregator.class);
+
+    private final MdSAL mdSAL;
+    private final EventSourceTopology eventSourceTopology;
+
+    public EventAggregator(final MdSAL mdSAL, final EventSourceTopology eventSourceTopology) {
+        this.mdSAL = mdSAL;
+        this.eventSourceTopology = eventSourceTopology;
+    }
+
+    public void mdsalReady() {
+        mdSAL.addRpcImplementation(EventAggregatorService.class, this);
+    }
+
+    @Override
+    public Future<RpcResult<CreateTopicOutput>> createTopic(final CreateTopicInput input) {
+        LOGGER.info("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}",
+                input.getNotificationPattern(),
+                input.getNodeIdPattern());
+
+        Topic topic = new Topic(new NotificationPattern(input.getNotificationPattern()), input.getNodeIdPattern().getValue(), mdSAL);
+
+        //# Make sure we capture all nodes from now on
+        eventSourceTopology.registerDataChangeListener(topic);
+
+        //# Notify existing nodes
+        //# Code reader note: Context of Node type is NetworkTopology
+        List<Node> nodes = eventSourceTopology.snapshot();
+        for (Node node : nodes) {
+            NodeId nodeIdToNotify = node.getAugmentation(Node1.class).getEventSourceNode();
+            topic.notifyNode(nodeIdToNotify);
+        }
+
+        CreateTopicOutput cto = new CreateTopicOutputBuilder()
+                .setTopicId(topic.getTopicId())
+                .build();
+
+        return Util.resultFor(cto);
+    }
+
+    @Override
+    public Future<RpcResult<Void>> destroyTopic(final DestroyTopicInput input) {
+        // 1. UNREGISTER DATA CHANGE LISTENER -> ?
+        // 2. CLOSE TOPIC
+        return null;
+    }
+}