From: Vivek Srivastava Date: Thu, 14 Jan 2016 11:53:09 +0000 (+0000) Subject: Merge "Use ClusteredDataChangeListener for dhcps" X-Git-Tag: release/beryllium~54 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=46cb4d4957935340b8cda1ad254206dc2302f70c;hp=84eb40f762ad62cb0b1f71768bab07fc17d4a81d;p=vpnservice.git Merge "Use ClusteredDataChangeListener for dhcps" --- diff --git a/dhcpservice/dhcpservice-impl/src/main/java/org/opendaylight/vpnservice/dhcpservice/DhcpConfigListener.java b/dhcpservice/dhcpservice-impl/src/main/java/org/opendaylight/vpnservice/dhcpservice/DhcpConfigListener.java index 57512b4c..2d187055 100644 --- a/dhcpservice/dhcpservice-impl/src/main/java/org/opendaylight/vpnservice/dhcpservice/DhcpConfigListener.java +++ b/dhcpservice/dhcpservice-impl/src/main/java/org/opendaylight/vpnservice/dhcpservice/DhcpConfigListener.java @@ -7,8 +7,10 @@ */ package org.opendaylight.vpnservice.dhcpservice; -import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.neutronvpn.rev150602.dhcp.config.Configs; +import org.opendaylight.controller.md.sal.binding.api.ClusteredDataChangeListener; +import org.opendaylight.vpnservice.datastoreutils.AsyncClusteredDataChangeListenerBase; +import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.neutronvpn.rev150602.dhcp.config.Configs; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.vpnservice.datastoreutils.AsyncDataChangeListenerBase; import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.neutronvpn.rev150602.DhcpConfig; @@ -22,7 +24,7 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class DhcpConfigListener extends AsyncDataChangeListenerBase implements AutoCloseable { +public class DhcpConfigListener extends AsyncClusteredDataChangeListenerBase implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(DhcpConfigListener.class); @@ -98,7 +100,7 @@ public class DhcpConfigListener extends AsyncDataChangeListenerBase implements ClusteredDataChangeListener, AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(AsyncClusteredDataChangeListenerBase.class); + + private static final int DATATREE_CHANGE_HANDLER_THREAD_POOL_CORE_SIZE = 1; + private static final int DATATREE_CHANGE_HANDLER_THREAD_POOL_MAX_SIZE = 1; + private static final int DATATREE_CHANGE_HANDLER_THREAD_POOL_KEEP_ALIVE_TIME_SECS = 300; + private static final int STARTUP_LOOP_TICK = 500; + private static final int STARTUP_LOOP_MAX_RETRIES = 8; + + private static ThreadPoolExecutor dataChangeHandlerExecutor = new ThreadPoolExecutor( + DATATREE_CHANGE_HANDLER_THREAD_POOL_CORE_SIZE, + DATATREE_CHANGE_HANDLER_THREAD_POOL_MAX_SIZE, + DATATREE_CHANGE_HANDLER_THREAD_POOL_KEEP_ALIVE_TIME_SECS, + TimeUnit.SECONDS, + new LinkedBlockingQueue()); + + private ListenerRegistration listenerRegistration; + protected final Class clazz; + private final Class eventClazz; + + /** + * @param clazz - for which the data change event is received + */ + public AsyncClusteredDataChangeListenerBase(Class clazz, Class eventClazz) { + this.clazz = Preconditions.checkNotNull(clazz, "Class can not be null!"); + this.eventClazz = Preconditions.checkNotNull(eventClazz, "eventClazz can not be null!"); + } + + public void registerListener(final LogicalDatastoreType dsType, final DataBroker db) { + try { + TaskRetryLooper looper = new TaskRetryLooper(STARTUP_LOOP_TICK, STARTUP_LOOP_MAX_RETRIES); + listenerRegistration = looper.loopUntilNoException(new Callable>() { + @Override + public ListenerRegistration call() throws Exception { + return db.registerDataChangeListener(dsType, getWildCardPath(), getDataChangeListener(), getDataChangeScope()); + } + }); + } catch (final Exception e) { + LOG.warn("{}: Data Tree Change listener registration failed.", eventClazz.getName()); + LOG.debug("{}: Data Tree Change listener registration failed: {}", eventClazz.getName(), e); + throw new IllegalStateException( eventClazz.getName() + "{}startup failed. System needs restart.", e); + } + } + + @Override + public void onDataChanged(final AsyncDataChangeEvent, DataObject> changeEvent) { + if (changeEvent == null) { + return; + } + + DataChangeHandler dataChangeHandler = new DataChangeHandler(changeEvent); + dataChangeHandlerExecutor.execute(dataChangeHandler); + } + + @SuppressWarnings("unchecked") + private void createData(final Map, DataObject> createdData) { + final Set> keys = createdData.keySet() != null + ? createdData.keySet() : Collections.>emptySet(); + for (InstanceIdentifier key : keys) { + if (clazz.equals(key.getTargetType())) { + InstanceIdentifier createKeyIdent = key.firstIdentifierOf(clazz); + final Optional value = Optional.of(createdData.get(key)); + if (value.isPresent()) { + this.add(createKeyIdent, (T)value.get()); + } + } + } + } + + @SuppressWarnings("unchecked") + private void updateData(final Map, DataObject> updateData, + final Map, DataObject> originalData) { + + final Set> keys = updateData.keySet() != null + ? updateData.keySet() : Collections.>emptySet(); + for (InstanceIdentifier key : keys) { + if (clazz.equals(key.getTargetType())) { + InstanceIdentifier updateKeyIdent = key.firstIdentifierOf(clazz); + final Optional value = Optional.of(updateData.get(key)); + final Optional original = Optional.of(originalData.get(key)); + if (value.isPresent() && original.isPresent()) { + this.update(updateKeyIdent, (T) original.get(), (T) value.get()); + } + } + } + } + + @SuppressWarnings("unchecked") + private void removeData(final Set> removeData, + final Map, DataObject> originalData) { + + for (InstanceIdentifier key : removeData) { + if (clazz.equals(key.getTargetType())) { + final InstanceIdentifier ident = key.firstIdentifierOf(clazz); + final DataObject removeValue = originalData.get(key); + this.remove(ident, (T)removeValue); + } + } + } + + @Override + public void close() throws Exception { + if (listenerRegistration != null) { + try { + listenerRegistration.close(); + } catch (final Exception e) { + LOG.error("Error when cleaning up ClusteredDataChangeListener.", e); + } + listenerRegistration = null; + } + LOG.info("Interface Manager Closed"); + } + + protected abstract void remove(InstanceIdentifier identifier, T del); + + protected abstract void update(InstanceIdentifier identifier, T original, T update); + + protected abstract void add(InstanceIdentifier identifier, T add); + + protected abstract InstanceIdentifier getWildCardPath(); + + protected abstract ClusteredDataChangeListener getDataChangeListener(); + + protected abstract AsyncDataBroker.DataChangeScope getDataChangeScope(); + + public class DataChangeHandler implements Runnable { + final AsyncDataChangeEvent, DataObject> changeEvent; + + public DataChangeHandler(final AsyncDataChangeEvent, DataObject> changeEvent) { + this.changeEvent = changeEvent; + } + + @Override + public void run() { + Preconditions.checkNotNull(changeEvent,"Async ChangeEvent can not be null!"); + + /* All DataObjects for create */ + final Map, DataObject> createdData = changeEvent.getCreatedData() != null + ? changeEvent.getCreatedData() : Collections., DataObject>emptyMap(); + /* All DataObjects for remove */ + final Set> removeData = changeEvent.getRemovedPaths() != null + ? changeEvent.getRemovedPaths() : Collections.>emptySet(); + /* All DataObjects for updates */ + final Map, DataObject> updateData = changeEvent.getUpdatedData() != null + ? changeEvent.getUpdatedData() : Collections., DataObject>emptyMap(); + /* All Original DataObjects */ + final Map, DataObject> originalData = changeEvent.getOriginalData() != null + ? changeEvent.getOriginalData() : Collections., DataObject>emptyMap(); + + createData(createdData); + updateData(updateData, originalData); + removeData(removeData, originalData); + } + } +}