--- /dev/null
+/*
+ * Copyright (C) 2014 Red Hat, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Dave Tucker, Flavio Fernandes
+ */
+
+package org.opendaylight.ovsdb.openstack.netvirt;
+
+/**
+ * Abstract class for events used by neutron northbound and southbound events.
+ */
+public abstract class AbstractEvent {
+ public enum HandlerType {
+ SOUTHBOUND,
+ NEUTRON_FLOATING_IP,
+ NEUTRON_NETWORK,
+ NEUTRON_PORT,
+ NEUTRON_PORT_SECURITY,
+ NEUTRON_ROUTER,
+ NEUTRON_SUBNET,
+ NEUTRON_FWAAS;
+
+ public static final int size = HandlerType.values().length;
+ }
+ public enum Action { ADD, UPDATE, DELETE }
+
+ private HandlerType handlerType;
+ private Action action;
+
+ private AbstractEvent() {
+ // this is private to force proper construction
+ }
+
+ protected AbstractEvent(HandlerType handlerType, Action action) {
+ this.handlerType = handlerType;
+ this.action = action;
+ }
+
+ public HandlerType getHandlerType() {
+ return handlerType;
+ }
+
+ public Action getAction() {
+ return action;
+ }
+
+ @Override
+ public String toString() {
+ return "AbstractEvent [handlerType=" + handlerType + " action=" + action + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((handlerType == null) ? 0 : handlerType.hashCode());
+ result = prime * result + ((action == null) ? 0 : action.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ AbstractEvent other = (AbstractEvent) obj;
+ if (handlerType == null) {
+ if (other.handlerType != null)
+ return false;
+ } else if (!handlerType.equals(other.handlerType))
+ return false;
+ if (action == null) {
+ if (other.action != null)
+ return false;
+ } else if (!action.equals(other.action))
+ return false;
+ return true;
+ }
+}
import org.opendaylight.controller.sal.utils.Status;
import org.opendaylight.controller.sal.utils.StatusCode;
+import org.opendaylight.ovsdb.openstack.netvirt.api.EventDispatcher;
+import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.HttpURLConnection;
/**
- * Abstract class for utility functions used by neutron handlers.
+ * OpenStack related events originate from multiple north callbacks as well as south.
+ * This interface provides a layer of abstraction between the event dispatcher and the
+ * handlers.
*/
public abstract class AbstractHandler {
*/
static final Logger logger = LoggerFactory.getLogger(AbstractHandler.class);
+ // The implementation for each of these services is resolved by the OSGi Service Manager
+ private volatile EventDispatcher eventDispatcher;
+
/**
* Convert failure status returned by the manager into
* neutron API service errors.
return result;
}
+
+ /**
+ * Enqueue the event.
+ *
+ * @param event the {@link org.opendaylight.ovsdb.openstack.netvirt.AbstractEvent} event to be handled.
+ * @see org.opendaylight.ovsdb.openstack.netvirt.api.EventDispatcher
+ */
+ protected void enqueueEvent(AbstractEvent abstractEvent) {
+ Preconditions.checkNotNull(eventDispatcher);
+ eventDispatcher.enqueueEvent(abstractEvent);
+ }
+
+ /**
+ * Process the event.
+ *
+ * @param event the {@link org.opendaylight.ovsdb.openstack.netvirt.AbstractEvent} event to be handled.
+ * @see org.opendaylight.ovsdb.openstack.netvirt.api.EventDispatcher
+ */
+ public abstract void processEvent(AbstractEvent abstractEvent);
+
}
import org.opendaylight.controller.sal.core.ComponentActivatorAbstractBase;
import org.opendaylight.controller.switchmanager.IInventoryListener;
import org.opendaylight.ovsdb.openstack.netvirt.api.BridgeConfigurationManager;
+import org.opendaylight.ovsdb.openstack.netvirt.api.Constants;
+import org.opendaylight.ovsdb.openstack.netvirt.api.EventDispatcher;
import org.opendaylight.ovsdb.openstack.netvirt.api.NetworkingProvider;
import org.opendaylight.ovsdb.openstack.netvirt.api.NetworkingProviderManager;
import org.opendaylight.ovsdb.openstack.netvirt.api.TenantNetworkManager;
import org.opendaylight.ovsdb.openstack.netvirt.api.VlanConfigurationCache;
import org.opendaylight.ovsdb.openstack.netvirt.impl.BridgeConfigurationManagerImpl;
import org.opendaylight.ovsdb.openstack.netvirt.impl.ConfigurationServiceImpl;
+import org.opendaylight.ovsdb.openstack.netvirt.impl.EventDispatcherImpl;
import org.opendaylight.ovsdb.openstack.netvirt.impl.ProviderNetworkManagerImpl;
import org.opendaylight.ovsdb.openstack.netvirt.impl.TenantNetworkManagerImpl;
import org.opendaylight.ovsdb.openstack.netvirt.impl.VlanConfigurationCacheImpl;
import org.apache.felix.dm.Component;
+import java.util.Properties;
+
/**
* OSGi bundle activator for the OVSDB Neutron Interface.
*/
RouterHandler.class,
SouthboundHandler.class,
PortSecurityHandler.class,
- ProviderNetworkManagerImpl.class};
+ ProviderNetworkManagerImpl.class,
+ EventDispatcherImpl.class};
return res;
}
}
if (imp.equals(FloatingIPHandler.class)) {
- c.setInterface(INeutronFloatingIPAware.class.getName(), null);
+ Properties floatingIPHandlerPorperties = new Properties();
+ floatingIPHandlerPorperties.put(Constants.EVENT_HANDLER_TYPE_PROPERTY,
+ AbstractEvent.HandlerType.NEUTRON_FLOATING_IP);
+ c.setInterface(new String[] {INeutronFloatingIPAware.class.getName(),
+ AbstractHandler.class.getName()},
+ floatingIPHandlerPorperties);
c.add(createServiceDependency().setService(OvsdbConfigurationService.class).setRequired(true));
c.add(createServiceDependency().setService(OvsdbConnectionService.class).setRequired(true));
c.add(createServiceDependency().setService(OvsdbInventoryListener.class).setRequired(true));
+ c.add(createServiceDependency().setService(EventDispatcher.class).setRequired(true));
}
if (imp.equals(NetworkHandler.class)) {
- c.setInterface(INeutronNetworkAware.class.getName(), null);
+ Properties networkHandlerProperties = new Properties();
+ networkHandlerProperties.put(Constants.EVENT_HANDLER_TYPE_PROPERTY,
+ AbstractEvent.HandlerType.NEUTRON_NETWORK);
+ c.setInterface(new String[] {INeutronNetworkAware.class.getName(),
+ AbstractHandler.class.getName()},
+ networkHandlerProperties);
c.add(createServiceDependency().setService(TenantNetworkManager.class).setRequired(true));
c.add(createServiceDependency().setService(BridgeConfigurationManager.class).setRequired(true));
c.add(createServiceDependency().setService(
c.add(createServiceDependency().setService(OvsdbConnectionService.class).setRequired(true));
c.add(createServiceDependency().setService(INeutronNetworkCRUD.class).setRequired(true));
c.add(createServiceDependency().setService(OvsdbInventoryListener.class).setRequired(true));
+ c.add(createServiceDependency().setService(EventDispatcher.class).setRequired(true));
}
if (imp.equals(SubnetHandler.class)) {
- c.setInterface(INeutronSubnetAware.class.getName(), null);
+ Properties subnetHandlerProperties = new Properties();
+ subnetHandlerProperties.put(Constants.EVENT_HANDLER_TYPE_PROPERTY, AbstractEvent.HandlerType.NEUTRON_SUBNET);
+ c.setInterface(new String[] {INeutronSubnetAware.class.getName(),
+ AbstractHandler.class.getName()},
+ subnetHandlerProperties);
+ c.add(createServiceDependency().setService(EventDispatcher.class).setRequired(true));
}
if (imp.equals(PortHandler.class)) {
- c.setInterface(INeutronPortAware.class.getName(), null);
+ Properties portHandlerProperties = new Properties();
+ portHandlerProperties.put(Constants.EVENT_HANDLER_TYPE_PROPERTY, AbstractEvent.HandlerType.NEUTRON_PORT);
+ c.setInterface(new String[] {INeutronPortAware.class.getName(),
+ AbstractHandler.class.getName()},
+ portHandlerProperties);
c.add(createServiceDependency().setService(OvsdbConfigurationService.class).setRequired(true));
c.add(createServiceDependency().setService(OvsdbConnectionService.class).setRequired(true));
c.add(createServiceDependency().setService(OvsdbInventoryListener.class).setRequired(true));
+ c.add(createServiceDependency().setService(EventDispatcher.class).setRequired(true));
}
if (imp.equals(RouterHandler.class)) {
- c.setInterface(INeutronRouterAware.class.getName(), null);
+ Properties routerHandlerProperties = new Properties();
+ routerHandlerProperties.put(Constants.EVENT_HANDLER_TYPE_PROPERTY, AbstractEvent.HandlerType.NEUTRON_ROUTER);
+ c.setInterface(new String[] {INeutronRouterAware.class.getName(),
+ AbstractHandler.class.getName()},
+ routerHandlerProperties);
c.add(createServiceDependency().setService(OvsdbConfigurationService.class).setRequired(true));
c.add(createServiceDependency().setService(OvsdbConnectionService.class).setRequired(true));
c.add(createServiceDependency().setService(OvsdbInventoryListener.class).setRequired(true));
+ c.add(createServiceDependency().setService(EventDispatcher.class).setRequired(true));
}
if (imp.equals(SouthboundHandler.class)) {
+ Properties southboundHandlerProperties = new Properties();
+ southboundHandlerProperties.put(Constants.EVENT_HANDLER_TYPE_PROPERTY, AbstractEvent.HandlerType.SOUTHBOUND);
c.setInterface(new String[] {OvsdbInventoryListener.class.getName(),
- IInventoryListener.class.getName()}, null);
+ IInventoryListener.class.getName(),
+ AbstractHandler.class.getName()},
+ southboundHandlerProperties);
c.add(createServiceDependency().setService(
org.opendaylight.ovsdb.openstack.netvirt.api.ConfigurationService.class).setRequired(true));
c.add(createServiceDependency().setService(BridgeConfigurationManager.class).setRequired(true));
c.add(createServiceDependency().setService(NetworkingProviderManager.class).setRequired(true));
c.add(createServiceDependency().setService(OvsdbConfigurationService.class).setRequired(true));
c.add(createServiceDependency().setService(OvsdbConnectionService.class).setRequired(true));
+ c.add(createServiceDependency().setService(EventDispatcher.class).setRequired(true));
}
if (imp.equals(PortSecurityHandler.class)) {
+ Properties portSecurityHandlerProperties = new Properties();
+ portSecurityHandlerProperties.put(Constants.EVENT_HANDLER_TYPE_PROPERTY,
+ AbstractEvent.HandlerType.NEUTRON_PORT_SECURITY);
c.setInterface(new String[] {INeutronSecurityRuleAware.class.getName(),
- INeutronSecurityGroupAware.class.getName()}, null);
+ INeutronSecurityGroupAware.class.getName(),
+ AbstractHandler.class.getName()},
+ portSecurityHandlerProperties);
+ c.add(createServiceDependency().setService(EventDispatcher.class).setRequired(true));
}
if (imp.equals(ProviderNetworkManagerImpl.class)) {
.setService(NetworkingProvider.class)
.setCallbacks("providerAdded", "providerRemoved"));
}
+
+ if (imp.equals(EventDispatcherImpl.class)) {
+ c.setInterface(EventDispatcher.class.getName(), null);
+ c.add(createServiceDependency()
+ .setService(AbstractHandler.class)
+ .setCallbacks("eventHandlerAdded", "eventHandlerRemoved"));
+ }
}
}
*/
static final Logger logger = LoggerFactory.getLogger(FloatingIPHandler.class);
+ // The implementation for each of these services is resolved by the OSGi Service Manager
private volatile OvsdbConfigurationService ovsdbConfigurationService;
private volatile OvsdbConnectionService connectionService;
private volatile OvsdbInventoryListener ovsdbInventoryListener;
floatingIP.getFixedIPAddress(),
floatingIP.getFloatingIPUUID());
}
+
+ /**
+ * Process the event.
+ *
+ * @param abstractEvent the {@link org.opendaylight.ovsdb.openstack.netvirt.AbstractEvent} event to be handled.
+ * @see EventDispatcher
+ */
+ @Override
+ public void processEvent(AbstractEvent abstractEvent) {
+ if (!(abstractEvent instanceof NorthboundEvent)) {
+ logger.error("Unable to process abstract event " + abstractEvent);
+ return;
+ }
+ NorthboundEvent ev = (NorthboundEvent) abstractEvent;
+ switch (ev.getAction()) {
+ // TODO: add handling of events here, once callbacks do something
+ // other than logging.
+ default:
+ logger.warn("Unable to process event action " + ev.getAction());
+ break;
+ }
+ }
}
}
tenantNetworkManager.networkDeleted(network.getID());
}
+
+ /**
+ * Process the event.
+ *
+ * @param abstractEvent the {@link org.opendaylight.ovsdb.openstack.netvirt.AbstractEvent} event to be handled.
+ * @see org.opendaylight.ovsdb.openstack.netvirt.api.EventDispatcher
+ */
+ @Override
+ public void processEvent(AbstractEvent abstractEvent) {
+ if (!(abstractEvent instanceof NorthboundEvent)) {
+ logger.error("Unable to process abstract event " + abstractEvent);
+ return;
+ }
+ NorthboundEvent ev = (NorthboundEvent) abstractEvent;
+ switch (ev.getAction()) {
+ // TODO: add handling of events here, once callbacks do something
+ // other than logging.
+ default:
+ logger.warn("Unable to process event action " + ev.getAction());
+ break;
+ }
+ }
+
}
--- /dev/null
+/*
+ * Copyright (C) 2014 Red Hat, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Dave Tucker, Flavio Fernandes
+ */
+
+package org.opendaylight.ovsdb.openstack.netvirt;
+
+import org.opendaylight.controller.networkconfig.neutron.NeutronPort;
+
+public class NorthboundEvent extends AbstractEvent {
+
+ private NeutronPort port;
+
+ NorthboundEvent(NeutronPort port, Action action) {
+ super(HandlerType.NEUTRON_PORT, action);
+ this.port = port;
+ }
+
+ public NeutronPort getPort() {
+ return port;
+ }
+
+ @Override
+ public String toString() {
+ return "NorthboundEvent [action=" + super.getAction() + ", port=" + port + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = super.hashCode();
+ result = prime * result + ((port == null) ? 0 : port.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ if (!super.equals(obj))
+ return false;
+ NorthboundEvent other = (NorthboundEvent) obj;
+ if (port == null) {
+ if (other.port != null)
+ return false;
+ } else if (!port.equals(other.port))
+ return false;
+ return true;
+ }
+}
*/
static final Logger logger = LoggerFactory.getLogger(PortHandler.class);
+ // The implementation for each of these services is resolved by the OSGi Service Manager
private volatile OvsdbConfigurationService ovsdbConfigurationService;
private volatile OvsdbConnectionService connectionService;
private volatile OvsdbInventoryListener ovsdbInventoryListener;
return;
}
+ enqueueEvent(new NorthboundEvent(port, NorthboundEvent.Action.ADD));
+ }
+ private void doNeutronPortCreated(NeutronPort port) {
logger.debug(" Port-ADD successful for tenant-id - {}," +
- " network-id - {}, port-id - {}, result - {} ",
+ " network-id - {}, port-id - {}",
port.getTenantID(), port.getNetworkUUID(),
- port.getID(), result);
+ port.getID());
}
/**
return;
}
+ enqueueEvent(new NorthboundEvent(neutronPort, NorthboundEvent.Action.DELETE));
+ }
+ private void doNeutronPortDeleted(NeutronPort neutronPort) {
+ logger.debug("Handling neutron delete port " + neutronPort);
+
List<Node> nodes = connectionService.getNodes();
for (Node node : nodes) {
try {
neutronPort.getID());
}
+
+ /**
+ * Process the event.
+ *
+ * @param abstractEvent the {@link org.opendaylight.ovsdb.openstack.netvirt.AbstractEvent} event to be handled.
+ * @see EventDispatcher
+ */
+ @Override
+ public void processEvent(AbstractEvent abstractEvent) {
+ if (!(abstractEvent instanceof NorthboundEvent)) {
+ logger.error("Unable to process abstract event " + abstractEvent);
+ return;
+ }
+ NorthboundEvent ev = (NorthboundEvent) abstractEvent;
+ switch (ev.getAction()) {
+ case ADD:
+ doNeutronPortCreated(ev.getPort());
+ break;
+ case DELETE:
+ doNeutronPortDeleted(ev.getPort());
+ break;
+ default:
+ logger.warn("Unable to process event action " + ev.getAction());
+ break;
+ }
+ }
}
return;
}
}
+
+ /**
+ * Process the event.
+ *
+ * @param abstractEvent the {@link org.opendaylight.ovsdb.openstack.netvirt.AbstractEvent} event to be handled.
+ * @see org.opendaylight.ovsdb.openstack.netvirt.api.EventDispatcher
+ */
+ @Override
+ public void processEvent(AbstractEvent abstractEvent) {
+ if (!(abstractEvent instanceof NorthboundEvent)) {
+ logger.error("Unable to process abstract event " + abstractEvent);
+ return;
+ }
+ NorthboundEvent ev = (NorthboundEvent) abstractEvent;
+ switch (ev.getAction()) {
+ // TODO: add handling of events here, once callbacks do something
+ // other than logging.
+ default:
+ logger.warn("Unable to process event action " + ev.getAction());
+ break;
+ }
+ }
+
}
\ No newline at end of file
*/
static final Logger logger = LoggerFactory.getLogger(RouterHandler.class);
+ // The implementation for each of these services is resolved by the OSGi Service Manager
private volatile OvsdbConfigurationService ovsdbConfigurationService;
private volatile OvsdbConnectionService connectionService;
private volatile OvsdbInventoryListener ovsdbInventoryListener;
logger.debug(" Router {} interface {} detached. Subnet {}", router.getName(), routerInterface.getPortUUID(),
routerInterface.getSubnetUUID());
}
+
+ /**
+ * Process the event.
+ *
+ * @param abstractEvent the {@link org.opendaylight.ovsdb.openstack.netvirt.AbstractEvent} event to be handled.
+ * @see org.opendaylight.ovsdb.openstack.netvirt.api.EventDispatcher
+ */
+ @Override
+ public void processEvent(AbstractEvent abstractEvent) {
+ if (!(abstractEvent instanceof NorthboundEvent)) {
+ logger.error("Unable to process abstract event " + abstractEvent);
+ return;
+ }
+ NorthboundEvent ev = (NorthboundEvent) abstractEvent;
+ switch (ev.getAction()) {
+ // TODO: add handling of events here, once callbacks do something
+ // other than logging.
+ default:
+ logger.warn("Unable to process event action " + ev.getAction());
+ break;
+ }
+ }
}
import org.opendaylight.controller.sal.core.Node;
import org.opendaylight.ovsdb.lib.notation.Row;
-public class SouthboundEvent {
+public class SouthboundEvent extends AbstractEvent {
public enum Type { NODE, ROW };
- public enum Action { ADD, UPDATE, DELETE };
private Type type;
- private Action action;
private Node node;
private String tableName;
private String uuid;
private Row row;
private Object context;
public SouthboundEvent(Node node, Action action) {
- super();
+ super(HandlerType.SOUTHBOUND, action);
this.type = Type.NODE;
- this.action = action;
this.node = node;
}
public SouthboundEvent(Node node, String tableName, String uuid, Row row, Action action) {
- super();
+ super(HandlerType.SOUTHBOUND, action);
this.type = Type.ROW;
- this.action = action;
this.node = node;
this.tableName = tableName;
this.uuid = uuid;
this.row = row;
}
public SouthboundEvent(Node node, String tableName, String uuid, Row row, Object context, Action action) {
- super();
+ super(HandlerType.SOUTHBOUND, action);
this.type = Type.ROW;
- this.action = action;
this.node = node;
this.tableName = tableName;
this.uuid = uuid;
public Type getType() {
return type;
}
- public Action getAction() {
- return action;
- }
public Node getNode() {
return node;
}
}
@Override
public String toString() {
- return "SouthboundEvent [type=" + type + ", action=" + action + ", node=" + node + ", tableName=" + tableName
+ return "SouthboundEvent [type=" + type + ", action=" + super.getAction() + ", node=" + node + ", tableName=" + tableName
+ ", uuid=" + uuid + ", row=" + row + ", context=" + context.toString() + "]";
}
@Override
public int hashCode() {
final int prime = 31;
- int result = 1;
- result = prime * result + ((action == null) ? 0 : action.hashCode());
+ int result = super.hashCode();
result = prime * result + ((node == null) ? 0 : node.hashCode());
result = prime * result + ((tableName == null) ? 0 : tableName.hashCode());
result = prime * result + ((type == null) ? 0 : type.hashCode());
return false;
if (getClass() != obj.getClass())
return false;
- SouthboundEvent other = (SouthboundEvent) obj;
- if (action != other.action)
+ if (!super.equals(obj))
return false;
+ SouthboundEvent other = (SouthboundEvent) obj;
if (node == null) {
if (other.node != null)
return false;
return false;
} else if (!tableName.equals(other.tableName))
return false;
- if (type != other.type)
+ if (type == null) {
+ if (other.type != null)
+ return false;
+ } else if (!type.equals(other.type))
return false;
if (uuid == null) {
if (other.uuid != null)
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-public class SouthboundHandler extends AbstractHandler implements OvsdbInventoryListener, IInventoryListener {
+public class SouthboundHandler extends AbstractHandler implements OvsdbInventoryListener,
+ IInventoryListener {
static final Logger logger = LoggerFactory.getLogger(SouthboundHandler.class);
//private Thread eventThread;
- private ExecutorService eventHandler;
- private BlockingQueue<SouthboundEvent> events;
List<Node> nodeCache;
// The implementation for each of these services is resolved by the OSGi Service Manager
private volatile OvsdbConnectionService connectionService;
void init() {
- eventHandler = Executors.newSingleThreadExecutor();
- this.events = new LinkedBlockingQueue<>();
nodeCache = Lists.newArrayList();
}
void start() {
- eventHandler.submit(new Runnable() {
- @Override
- public void run() {
- while (true) {
- SouthboundEvent ev;
- try {
- ev = events.take();
- } catch (InterruptedException e) {
- logger.info("The event handler thread was interrupted, shutting down", e);
- return;
- }
- switch (ev.getType()) {
- case NODE:
- try {
- processNodeUpdate(ev.getNode(), ev.getAction());
- } catch (Exception e) {
- logger.error("Exception caught in ProcessNodeUpdate for node " + ev.getNode(), e);
- }
- break;
- case ROW:
- try {
- processRowUpdate(ev.getNode(), ev.getTableName(), ev.getUuid(), ev.getRow(),
- ev.getContext(),ev.getAction());
- } catch (Exception e) {
- logger.error("Exception caught in ProcessRowUpdate for node " + ev.getNode(), e);
- }
- break;
- default:
- logger.warn("Unable to process action " + ev.getAction() + " for node " + ev.getNode());
- }
- }
- }
- });
this.triggerUpdates();
}
- void stop() {
- // stop accepting new tasks
- eventHandler.shutdown();
- try {
- // Wait a while for existing tasks to terminate
- if (!eventHandler.awaitTermination(10, TimeUnit.SECONDS)) {
- eventHandler.shutdownNow();
- // Wait a while for tasks to respond to being cancelled
- if (!eventHandler.awaitTermination(10, TimeUnit.SECONDS))
- logger.error("Southbound Event Handler did not terminate");
- }
- } catch (InterruptedException e) {
- // (Re-)Cancel if current thread also interrupted
- eventHandler.shutdownNow();
- // Preserve interrupt status
- Thread.currentThread().interrupt();
- }
- }
-
@Override
public void nodeAdded(Node node, InetAddress address, int port) {
this.enqueueEvent(new SouthboundEvent(node, SouthboundEvent.Action.ADD));
}
/*
- * Ignore unneccesary updates to be even considered for processing.
+ * Ignore unnecessary updates to be even considered for processing.
* (Especially stats update are fast and furious).
*/
this.enqueueEvent(new SouthboundEvent(node, tableName, uuid, row, context, SouthboundEvent.Action.DELETE));
}
- private void enqueueEvent (SouthboundEvent event) {
- try {
- events.put(event);
- } catch (InterruptedException e) {
- logger.error("Thread was interrupted while trying to enqueue event ", e);
- }
- }
-
public void processNodeUpdate(Node node, SouthboundEvent.Action action) {
if (action == SouthboundEvent.Action.DELETE) return;
logger.trace("Process Node added {}", node);
}
}
}
+
+ /**
+ * Process the event.
+ *
+ * @param abstractEvent the {@link org.opendaylight.ovsdb.openstack.netvirt.AbstractEvent} event to be handled.
+ * @see EventDispatcher
+ */
+ @Override
+ public void processEvent(AbstractEvent abstractEvent) {
+ if (!(abstractEvent instanceof SouthboundEvent)) {
+ logger.error("Unable to process abstract event " + abstractEvent);
+ return;
+ }
+ SouthboundEvent ev = (SouthboundEvent) abstractEvent;
+ switch (ev.getType()) {
+ case NODE:
+ try {
+ processNodeUpdate(ev.getNode(), ev.getAction());
+ } catch (Exception e) {
+ logger.error("Exception caught in ProcessNodeUpdate for node " + ev.getNode(), e);
+ }
+ break;
+ case ROW:
+ try {
+ processRowUpdate(ev.getNode(), ev.getTableName(), ev.getUuid(), ev.getRow(),
+ ev.getContext(),ev.getAction());
+ } catch (Exception e) {
+ logger.error("Exception caught in ProcessRowUpdate for node " + ev.getNode(), e);
+ }
+ break;
+ default:
+ logger.warn("Unable to process type " + ev.getType() +
+ " action " + ev.getAction() + " for node " + ev.getNode());
+ break;
+ }
+ }
}
// TODO Auto-generated method stub
}
+
+ /**
+ * Process the event.
+ *
+ * @param abstractEvent the {@link org.opendaylight.ovsdb.openstack.netvirt.AbstractEvent} event to be handled.
+ * @see org.opendaylight.ovsdb.openstack.netvirt.api.EventDispatcher
+ */
+ @Override
+ public void processEvent(AbstractEvent abstractEvent) {
+ if (!(abstractEvent instanceof NorthboundEvent)) {
+ logger.error("Unable to process abstract event " + abstractEvent);
+ return;
+ }
+ NorthboundEvent ev = (NorthboundEvent) abstractEvent;
+ switch (ev.getAction()) {
+ // TODO: add handling of events here, once callbacks do something
+ // other than logging.
+ default:
+ logger.warn("Unable to process event action " + ev.getAction());
+ break;
+ }
+ }
}
public static final String SOUTHBOUND_PROTOCOL_PROPERTY = "southboundProtocol";
public static final String PROVIDER_TYPE_PROPERTY = "providerType";
public static final String OPENFLOW_VERSION_PROPERTY = "openflowVersion";
+ public static final String EVENT_HANDLER_TYPE_PROPERTY = "eventHandlerType";
}
--- /dev/null
+/*
+ * Copyright (C) 2014 Red Hat, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Dave Tucker, Flavio Fernandes
+ */
+
+package org.opendaylight.ovsdb.openstack.netvirt.api;
+
+import org.opendaylight.ovsdb.openstack.netvirt.AbstractEvent;
+
+/**
+ * Openstack related events will be enqueued into a common event queue.
+ * This interface provides access to an event dispatcher, as well as registration to link dispatcher to which handlers
+ * dispatcher will utilize.
+ */
+public interface EventDispatcher {
+ /**
+ * Enqueue the event.
+ * @param event the {@link org.opendaylight.ovsdb.openstack.netvirt.AbstractEvent} event to be handled.
+ */
+ public void enqueueEvent(AbstractEvent event);
+}
+
--- /dev/null
+/*
+ * Copyright (C) 2014 Red Hat, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Dave Tucker, Flavio Fernandes
+ */
+
+package org.opendaylight.ovsdb.openstack.netvirt.impl;
+
+import org.opendaylight.ovsdb.openstack.netvirt.AbstractEvent;
+import org.opendaylight.ovsdb.openstack.netvirt.AbstractHandler;
+import org.opendaylight.ovsdb.openstack.netvirt.api.Constants;
+import org.opendaylight.ovsdb.openstack.netvirt.api.EventDispatcher;
+
+import org.osgi.framework.ServiceReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class EventDispatcherImpl implements EventDispatcher {
+
+ static final Logger logger = LoggerFactory.getLogger(EventDispatcher.class);
+ private ExecutorService eventHandler;
+ private BlockingQueue<AbstractEvent> events;
+
+ private AbstractHandler[] handlers;
+
+ void init() {
+ eventHandler = Executors.newSingleThreadExecutor();
+ this.events = new LinkedBlockingQueue<>();
+ this.handlers = new AbstractHandler[AbstractEvent.HandlerType.size];
+ }
+
+ void start() {
+ eventHandler.submit(new Runnable() {
+ @Override
+ public void run() {
+ while (true) {
+ AbstractEvent ev;
+ try {
+ ev = events.take();
+ } catch (InterruptedException e) {
+ logger.info("The event handler thread was interrupted, shutting down", e);
+ return;
+ }
+ dispatchEvent(ev);
+ }
+ }
+ });
+ logger.debug("event dispatcher is started");
+ }
+
+ void stop() {
+ // stop accepting new tasks
+ eventHandler.shutdown();
+ try {
+ // Wait a while for existing tasks to terminate
+ if (!eventHandler.awaitTermination(10, TimeUnit.SECONDS)) {
+ eventHandler.shutdownNow();
+ // Wait a while for tasks to respond to being cancelled
+ if (!eventHandler.awaitTermination(10, TimeUnit.SECONDS))
+ logger.error("Dispatcher's event handler did not terminate");
+ }
+ } catch (InterruptedException e) {
+ // (Re-)Cancel if current thread also interrupted
+ eventHandler.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+ logger.debug("event dispatcher is stopped");
+ }
+
+ private void dispatchEvent(AbstractEvent ev) {
+ AbstractHandler handler = handlers[ev.getHandlerType().ordinal()];
+ if (handler == null) {
+ logger.warn("event dispatcher found no handler for " + ev);
+ return;
+ }
+
+ handler.processEvent(ev);
+ }
+
+ public void eventHandlerAdded(final ServiceReference ref, AbstractHandler handler){
+ Long pid = (Long) ref.getProperty(org.osgi.framework.Constants.SERVICE_ID);
+ Object handlerTypeObject = ref.getProperty(Constants.EVENT_HANDLER_TYPE_PROPERTY);
+ if (!(handlerTypeObject instanceof AbstractEvent.HandlerType)){
+ logger.error("Abstract handler reg failed to provide a valid handler type " + handlerTypeObject);
+ return;
+ }
+ AbstractEvent.HandlerType handlerType = (AbstractEvent.HandlerType) handlerTypeObject;
+ handlers[handlerType.ordinal()] = handler;
+
+ logger.debug("Event handler for type {} registered for {}, pid {}",
+ handlerType, handler.getClass().getName(), pid);
+ }
+
+ public void eventHandlerRemoved(final ServiceReference ref){
+ Long pid = (Long) ref.getProperty(org.osgi.framework.Constants.SERVICE_ID);
+ Object handlerTypeObject = ref.getProperty(Constants.EVENT_HANDLER_TYPE_PROPERTY);
+ if (!(handlerTypeObject instanceof AbstractEvent.HandlerType)){
+ logger.error("Abstract handler unreg failed to provide a valid handler type " + handlerTypeObject);
+ return;
+ }
+ AbstractEvent.HandlerType handlerType = (AbstractEvent.HandlerType) handlerTypeObject;
+ handlers[handlerType.ordinal()] = null;
+
+ logger.debug("Event handler for type {} unregistered pid {}", handlerType, pid);
+ }
+
+ /**
+ * Enqueue the event.
+ *
+ * @param event the {@link org.opendaylight.ovsdb.openstack.netvirt.AbstractEvent} event to be handled.
+ */
+ @Override
+ public void enqueueEvent(AbstractEvent event) {
+ if (event == null) {
+ return;
+ }
+
+ try {
+ events.put(event);
+ } catch (InterruptedException e) {
+ logger.error("Thread was interrupted while trying to enqueue event ", e);
+ }
+ }
+}