import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import org.opendaylight.ovsdb.openstack.netvirt.api.Action;
+import org.opendaylight.ovsdb.openstack.netvirt.api.NodeCacheListener;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
import org.osgi.framework.ServiceReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class PipelineOrchestratorImpl implements PipelineOrchestrator {
+public class PipelineOrchestratorImpl implements NodeCacheListener, PipelineOrchestrator {
private static final Logger logger = LoggerFactory.getLogger(PipelineOrchestratorImpl.class);
private List<Service> staticPipeline = Lists.newArrayList(
Service.L2_FORWARDING
);
Map<Service, AbstractServiceInstance> serviceRegistry = Maps.newConcurrentMap();
- private volatile BlockingQueue<String> queue;
+ private volatile BlockingQueue<Node> queue;
private ExecutorService eventHandler;
public PipelineOrchestratorImpl() {
}
public void init() {
eventHandler = Executors.newSingleThreadExecutor();
- this.queue = new LinkedBlockingQueue<String>();
+ this.queue = new LinkedBlockingQueue<Node>();
logger.info(">>>>> init PipelineOrchestratorImpl");
}
public void run() {
try {
while (true) {
- String nodeId = queue.take();
+ Node node = queue.take();
/*
* Since we are hooking on OpendaylightInventoryListener and as observed in
* Bug 1997 multiple Threads trying to write to a same table at the same time
* causes programming issues. Hence delaying the programming by a second to
* avoid the clash. This hack/workaround should be removed once Bug 1997 is resolved.
*/
- logger.info(">>>>> dequeue: {}", nodeId);
+ logger.info(">>>>> dequeue: {}", node);
Thread.sleep(1000);
for (Service service : staticPipeline) {
AbstractServiceInstance serviceInstance = getServiceInstance(service);
logger.info("pipeline: {} - {}", service, serviceInstance);
if (serviceInstance != null) {
- serviceInstance.programDefaultPipelineRule(nodeId);
+ serviceInstance.programDefaultPipelineRule(node);
}
}
}
}
@Override
- public void enqueue(String nodeId) {
- logger.info(">>>>> enqueue: {}", nodeId);
+ public void enqueue(Node node) {
+ logger.info(">>>>> enqueue: {}", node);
try {
- queue.put(new String(nodeId));
+ queue.put(node);
} catch (InterruptedException e) {
- logger.warn("Failed to enqueue operation {}", nodeId, e);
+ logger.warn("Failed to enqueue operation {}", node, e);
}
}
+
+ @Override
+ public void notifyNode(Node node, Action action) {
+ enqueue(node);
+ }
}