Use ClusteredDataChangeListener for dhcps 06/32506/1
authorVishal Thapar <vishal.thapar@ericsson.com>
Wed, 13 Jan 2016 13:58:27 +0000 (14:58 +0100)
committerVishal Thapar <vishal.thapar@ericsson.com>
Wed, 13 Jan 2016 13:58:27 +0000 (14:58 +0100)
1. Adds AsycClusteredDataChangeListenerBase class which is abstract class
extending ClusteredDataChangeListener.
2. Modifies DhcpConfigListener to use AsyncClusteredDataChangeListener
class.

Change-Id: Iabddcc83c38a04179738c9af85599c753ef4699a
Signed-off-by: Vishal Thapar <vishal.thapar@ericsson.com>
dhcpservice/dhcpservice-impl/src/main/java/org/opendaylight/vpnservice/dhcpservice/DhcpConfigListener.java
mdsalutil/mdsalutil-api/src/main/java/org/opendaylight/vpnservice/datastoreutils/AsyncClusteredDataChangeListenerBase.java [new file with mode: 0644]

index 57512b4cb3546047f39d86a455dc3dcb5058df62..2d187055ac88bcdb7d548941eb7e24dab6065c6e 100644 (file)
@@ -7,8 +7,10 @@
  */
 package org.opendaylight.vpnservice.dhcpservice;
 
-import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.neutronvpn.rev150602.dhcp.config.Configs;
+import org.opendaylight.controller.md.sal.binding.api.ClusteredDataChangeListener;
 
+import org.opendaylight.vpnservice.datastoreutils.AsyncClusteredDataChangeListenerBase;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.neutronvpn.rev150602.dhcp.config.Configs;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.vpnservice.datastoreutils.AsyncDataChangeListenerBase;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.vpnservice.neutronvpn.rev150602.DhcpConfig;
@@ -22,7 +24,7 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class DhcpConfigListener extends AsyncDataChangeListenerBase<DhcpConfig, DhcpConfigListener> implements AutoCloseable {
+public class DhcpConfigListener extends AsyncClusteredDataChangeListenerBase<DhcpConfig, DhcpConfigListener> implements AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(DhcpConfigListener.class);
 
@@ -98,7 +100,7 @@ public class DhcpConfigListener extends AsyncDataChangeListenerBase<DhcpConfig,
     }
 
     @Override
-    protected DataChangeListener getDataChangeListener() {
+    protected ClusteredDataChangeListener getDataChangeListener() {
         return DhcpConfigListener.this;
     }
 
diff --git a/mdsalutil/mdsalutil-api/src/main/java/org/opendaylight/vpnservice/datastoreutils/AsyncClusteredDataChangeListenerBase.java b/mdsalutil/mdsalutil-api/src/main/java/org/opendaylight/vpnservice/datastoreutils/AsyncClusteredDataChangeListenerBase.java
new file mode 100644 (file)
index 0000000..612d72e
--- /dev/null
@@ -0,0 +1,184 @@
+/*
+ * Copyright (c) 2015 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.vpnservice.datastoreutils;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.md.sal.binding.api.*;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+public abstract class AsyncClusteredDataChangeListenerBase<T extends DataObject, K extends ClusteredDataChangeListener> implements ClusteredDataChangeListener, AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(AsyncClusteredDataChangeListenerBase.class);
+
+    private static final int DATATREE_CHANGE_HANDLER_THREAD_POOL_CORE_SIZE = 1;
+    private static final int DATATREE_CHANGE_HANDLER_THREAD_POOL_MAX_SIZE = 1;
+    private static final int DATATREE_CHANGE_HANDLER_THREAD_POOL_KEEP_ALIVE_TIME_SECS = 300;
+    private static final int STARTUP_LOOP_TICK = 500;
+    private static final int STARTUP_LOOP_MAX_RETRIES = 8;
+
+    private static ThreadPoolExecutor dataChangeHandlerExecutor = new ThreadPoolExecutor(
+            DATATREE_CHANGE_HANDLER_THREAD_POOL_CORE_SIZE,
+            DATATREE_CHANGE_HANDLER_THREAD_POOL_MAX_SIZE,
+            DATATREE_CHANGE_HANDLER_THREAD_POOL_KEEP_ALIVE_TIME_SECS,
+            TimeUnit.SECONDS,
+            new LinkedBlockingQueue<Runnable>());
+
+    private ListenerRegistration<K> listenerRegistration;
+    protected final Class<T> clazz;
+    private final Class<K> eventClazz;
+
+    /**
+     * @param clazz - for which the data change event is received
+     */
+    public AsyncClusteredDataChangeListenerBase(Class<T> clazz, Class<K> eventClazz) {
+        this.clazz = Preconditions.checkNotNull(clazz, "Class can not be null!");
+        this.eventClazz = Preconditions.checkNotNull(eventClazz, "eventClazz can not be null!");
+    }
+
+    public void registerListener(final LogicalDatastoreType dsType, final DataBroker db) {
+        try {
+            TaskRetryLooper looper = new TaskRetryLooper(STARTUP_LOOP_TICK, STARTUP_LOOP_MAX_RETRIES);
+            listenerRegistration = looper.loopUntilNoException(new Callable<ListenerRegistration<K>>() {
+                @Override
+                public ListenerRegistration call() throws Exception {
+                    return db.registerDataChangeListener(dsType, getWildCardPath(), getDataChangeListener(), getDataChangeScope());
+                }
+            });
+        } catch (final Exception e) {
+            LOG.warn("{}: Data Tree Change listener registration failed.", eventClazz.getName());
+            LOG.debug("{}: Data Tree Change listener registration failed: {}", eventClazz.getName(), e);
+            throw new IllegalStateException( eventClazz.getName() + "{}startup failed. System needs restart.", e);
+        }
+    }
+
+    @Override
+    public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changeEvent) {
+        if (changeEvent == null) {
+            return;
+        }
+
+        DataChangeHandler dataChangeHandler = new DataChangeHandler(changeEvent);
+        dataChangeHandlerExecutor.execute(dataChangeHandler);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void createData(final Map<InstanceIdentifier<?>, DataObject> createdData) {
+        final Set<InstanceIdentifier<?>> keys = createdData.keySet() != null
+                ? createdData.keySet() : Collections.<InstanceIdentifier<?>>emptySet();
+        for (InstanceIdentifier<?> key : keys) {
+            if (clazz.equals(key.getTargetType())) {
+                InstanceIdentifier<T> createKeyIdent = key.firstIdentifierOf(clazz);
+                final Optional<DataObject> value = Optional.of(createdData.get(key));
+                if (value.isPresent()) {
+                    this.add(createKeyIdent, (T)value.get());
+                }
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private void updateData(final Map<InstanceIdentifier<?>, DataObject> updateData,
+                            final Map<InstanceIdentifier<?>, DataObject> originalData) {
+
+        final Set<InstanceIdentifier<?>> keys = updateData.keySet() != null
+                ? updateData.keySet() : Collections.<InstanceIdentifier<?>>emptySet();
+        for (InstanceIdentifier<?> key : keys) {
+            if (clazz.equals(key.getTargetType())) {
+                InstanceIdentifier<T> updateKeyIdent = key.firstIdentifierOf(clazz);
+                final Optional<DataObject> value = Optional.of(updateData.get(key));
+                final Optional<DataObject> original = Optional.of(originalData.get(key));
+                if (value.isPresent() && original.isPresent()) {
+                    this.update(updateKeyIdent, (T) original.get(), (T) value.get());
+                }
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private void removeData(final Set<InstanceIdentifier<?>> removeData,
+                            final Map<InstanceIdentifier<?>, DataObject> originalData) {
+
+        for (InstanceIdentifier<?> key : removeData) {
+            if (clazz.equals(key.getTargetType())) {
+                final InstanceIdentifier<T> ident = key.firstIdentifierOf(clazz);
+                final DataObject removeValue = originalData.get(key);
+                this.remove(ident, (T)removeValue);
+            }
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (listenerRegistration != null) {
+            try {
+                listenerRegistration.close();
+            } catch (final Exception e) {
+                LOG.error("Error when cleaning up ClusteredDataChangeListener.", e);
+            }
+            listenerRegistration = null;
+        }
+        LOG.info("Interface Manager Closed");
+    }
+
+    protected abstract void remove(InstanceIdentifier<T> identifier, T del);
+
+    protected abstract void update(InstanceIdentifier<T> identifier, T original, T update);
+
+    protected abstract void add(InstanceIdentifier<T> identifier, T add);
+
+    protected abstract InstanceIdentifier<T> getWildCardPath();
+
+    protected abstract ClusteredDataChangeListener getDataChangeListener();
+
+    protected abstract AsyncDataBroker.DataChangeScope getDataChangeScope();
+
+    public class DataChangeHandler implements Runnable {
+        final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changeEvent;
+
+        public DataChangeHandler(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changeEvent) {
+            this.changeEvent = changeEvent;
+        }
+
+        @Override
+        public void run() {
+            Preconditions.checkNotNull(changeEvent,"Async ChangeEvent can not be null!");
+
+            /* All DataObjects for create */
+            final Map<InstanceIdentifier<?>, DataObject> createdData = changeEvent.getCreatedData() != null
+                    ? changeEvent.getCreatedData() : Collections.<InstanceIdentifier<?>, DataObject>emptyMap();
+            /* All DataObjects for remove */
+            final Set<InstanceIdentifier<?>> removeData = changeEvent.getRemovedPaths() != null
+                    ? changeEvent.getRemovedPaths() : Collections.<InstanceIdentifier<?>>emptySet();
+            /* All DataObjects for updates */
+            final Map<InstanceIdentifier<?>, DataObject> updateData = changeEvent.getUpdatedData() != null
+                    ? changeEvent.getUpdatedData() : Collections.<InstanceIdentifier<?>, DataObject>emptyMap();
+            /* All Original DataObjects */
+            final Map<InstanceIdentifier<?>, DataObject> originalData = changeEvent.getOriginalData() != null
+                    ? changeEvent.getOriginalData() : Collections.<InstanceIdentifier<?>, DataObject>emptyMap();
+
+            createData(createdData);
+            updateData(updateData, originalData);
+            removeData(removeData, originalData);
+        }
+    }
+}