2 * Copyright (C) 2014 Red Hat, Inc.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 * Authors : Dave Tucker, Madhu Venugopal
11 package org.opendaylight.ovsdb.openstack.netvirt.providers.openflow13;
13 import com.google.common.collect.Lists;
14 import com.google.common.collect.Maps;
15 import java.util.List;
17 import java.util.concurrent.BlockingQueue;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.LinkedBlockingQueue;
21 import org.osgi.framework.ServiceReference;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
25 public class PipelineOrchestratorImpl implements PipelineOrchestrator {
27 private static final Logger logger = LoggerFactory.getLogger(PipelineOrchestratorImpl.class);
28 private List<Service> staticPipeline = Lists.newArrayList(
30 Service.ARP_RESPONDER,
33 Service.LOAD_BALANCER,
35 Service.L3_FORWARDING,
41 Map<Service, AbstractServiceInstance> serviceRegistry = Maps.newConcurrentMap();
42 private volatile BlockingQueue<String> queue;
43 private ExecutorService eventHandler;
44 public PipelineOrchestratorImpl() {
47 public void registerService(final ServiceReference ref, AbstractServiceInstance serviceInstance){
48 Service service = (Service)ref.getProperty(AbstractServiceInstance.SERVICE_PROPERTY);
49 serviceRegistry.put(service, serviceInstance);
52 public void unregisterService(final ServiceReference ref) {
53 serviceRegistry.remove(ref.getProperty(AbstractServiceInstance.SERVICE_PROPERTY));
56 public Service getNextServiceInPipeline(Service service) {
57 int index = staticPipeline.indexOf(service);
58 if (index >= staticPipeline.size() - 1) return null;
59 return staticPipeline.get(index + 1);
63 public AbstractServiceInstance getServiceInstance(Service service) {
64 if (service == null) return null;
65 return serviceRegistry.get(service);
69 eventHandler = Executors.newSingleThreadExecutor();
70 this.queue = new LinkedBlockingQueue<String>();
74 eventHandler.submit(new Runnable() {
79 String nodeId = queue.take();
81 * Since we are hooking on OpendaylightInventoryListener and as observed in
82 * Bug 1997 multiple Threads trying to write to a same table at the same time
83 * causes programming issues. Hence delaying the programming by a second to
84 * avoid the clash. This hack/workaround should be removed once Bug 1997 is resolved.
86 logger.info(">>>>> dequeue: {}", nodeId);
88 for (Service service : staticPipeline) {
89 AbstractServiceInstance serviceInstance = getServiceInstance(service);
90 serviceInstance.programDefaultPipelineRule(nodeId);
93 } catch (Exception e) {
94 logger.warn("Processing interrupted, terminating ", e);
97 while (!queue.isEmpty()) {
107 eventHandler.shutdownNow();
111 public void enqueue(String nodeId) {
112 logger.info(">>>>> enqueue: {}", nodeId);
114 queue.put(new String(nodeId));
115 } catch (InterruptedException e) {
116 logger.warn("Failed to enqueue operation {}", nodeId, e);