2 * Copyright (C) 2014 Red Hat, Inc.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 * Authors : Dave Tucker, Madhu Venugopal
11 package org.opendaylight.ovsdb.openstack.netvirt.providers.openflow13;
13 import java.util.List;
15 import java.util.concurrent.BlockingQueue;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Executors;
18 import java.util.concurrent.LinkedBlockingQueue;
20 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
21 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
22 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
23 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
24 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
25 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRemoved;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorUpdated;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRemoved;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdated;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.OpendaylightInventoryListener;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
35 import org.opendaylight.yangtools.yang.binding.DataObject;
36 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
37 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder;
38 import org.osgi.framework.ServiceReference;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
42 import com.google.common.base.Optional;
43 import com.google.common.collect.Lists;
44 import com.google.common.collect.Maps;
45 import com.google.common.util.concurrent.CheckedFuture;
46 import com.google.common.util.concurrent.FutureCallback;
47 import com.google.common.util.concurrent.Futures;
49 public class PipelineOrchestratorImpl implements PipelineOrchestrator, OpendaylightInventoryListener, TransactionChainListener {
51 private static final Logger logger = LoggerFactory.getLogger(PipelineOrchestratorImpl.class);
52 private List<Service> staticPipeline = Lists.newArrayList(
54 Service.ARP_RESPONDER,
57 Service.LOAD_BALANCER,
64 Map<Service, AbstractServiceInstance> serviceRegistry = Maps.newConcurrentMap();
65 private volatile MdsalConsumer mdsalConsumer;
66 private volatile BlockingQueue<String> queue;
67 private ExecutorService eventHandler;
68 public PipelineOrchestratorImpl() {
71 public void registerService(final ServiceReference ref, AbstractServiceInstance serviceInstance){
72 Service service = (Service)ref.getProperty(AbstractServiceInstance.SERVICE_PROPERTY);
73 serviceRegistry.put(service, serviceInstance);
76 public void unregisterService(final ServiceReference ref) {
77 serviceRegistry.remove(ref.getProperty(AbstractServiceInstance.SERVICE_PROPERTY));
80 public Service getNextServiceInPipeline(Service service) {
81 int index = staticPipeline.indexOf(service);
82 if (index >= staticPipeline.size() - 1) return null;
83 return staticPipeline.get(index + 1);
87 public AbstractServiceInstance getServiceInstance(Service service) {
88 if (service == null) return null;
89 return serviceRegistry.get(service);
93 eventHandler = Executors.newSingleThreadExecutor();
94 this.queue = new LinkedBlockingQueue<String>();
95 NotificationProviderService notificationService = mdsalConsumer.getNotificationService();
96 if (notificationService != null) {
97 notificationService.registerNotificationListener(this);
100 public void start() {
101 eventHandler.submit(new Runnable() {
106 String nodeId = queue.take();
108 * Since we are hooking on OpendaylightInventoryListener and as observed in
109 * Bug 1997 multiple Threads trying to write to a same table at the same time
110 * causes programming issues. Hence delaying the programming by a second to
111 * avoid the clash. This hack/workaround should be removed once Bug 1997 is resolved.
114 for (Service service : staticPipeline) {
115 AbstractServiceInstance serviceInstance = getServiceInstance(service);
116 serviceInstance.programDefaultPipelineRule(nodeId);
119 } catch (Exception e) {
120 logger.warn("Processing interrupted, terminating ", e);
123 while (!queue.isEmpty()) {
133 eventHandler.shutdownNow();
136 void enqueue(String nodeId) {
138 queue.put(new String(nodeId));
139 } catch (InterruptedException e) {
140 logger.warn("Failed to enqueue operation {}", nodeId, e);
146 * Process the Node update notification. Check for Openflow node and make sure if the bridge is part of the Pipeline before
147 * programming the Pipeline specific flows.
150 public void onNodeUpdated(NodeUpdated nodeUpdated) {
151 NodeRef ref = nodeUpdated.getNodeRef();
152 InstanceIdentifier<Node> identifier = (InstanceIdentifier<Node>) ref.getValue();
153 logger.debug("Node Update received for : "+identifier.toString());
154 final NodeKey key = identifier.firstKeyOf(Node.class, NodeKey.class);
155 final String nodeId = key.getId().getValue();
156 if (key != null && key.getId().getValue().contains("openflow")) {
157 InstanceIdentifierBuilder<Node> builder = ((InstanceIdentifier<Node>) ref.getValue()).builder();
158 InstanceIdentifierBuilder<FlowCapableNode> augmentation = builder.augmentation(FlowCapableNode.class);
159 final InstanceIdentifier<FlowCapableNode> path = augmentation.build();
160 BindingTransactionChain txChain = mdsalConsumer.getDataBroker().createTransactionChain(this);
161 CheckedFuture readFuture = txChain.newReadWriteTransaction().read(LogicalDatastoreType.OPERATIONAL, path);
162 Futures.addCallback(readFuture, new FutureCallback<Optional<? extends DataObject>>() {
164 public void onSuccess(Optional<? extends DataObject> optional) {
165 if (!optional.isPresent()) {
171 public void onFailure(Throwable throwable) {
172 logger.debug(String.format("Can't retrieve node data for node %s. Writing node data with table0.", nodeId));
180 public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
181 final Throwable cause) {
182 logger.error("Failed to export Flow Capable Inventory, Transaction {} failed.",transaction.getIdentifier(),cause);
186 public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
190 public void onNodeConnectorRemoved(NodeConnectorRemoved arg0) {
191 // TODO Auto-generated method stub
196 public void onNodeConnectorUpdated(NodeConnectorUpdated arg0) {
197 // TODO Auto-generated method stub
202 public void onNodeRemoved(NodeRemoved arg0) {
203 // TODO Auto-generated method stub