2 * Copyright (c) 2015 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.vpnservice.datastoreutils;
11 import com.google.common.base.Preconditions;
12 import org.opendaylight.controller.md.sal.binding.api.*;
13 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
14 import org.opendaylight.yangtools.concepts.ListenerRegistration;
15 import org.opendaylight.yangtools.yang.binding.DataObject;
16 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
20 import java.util.Collection;
21 import java.util.concurrent.Callable;
22 import java.util.concurrent.LinkedBlockingQueue;
23 import java.util.concurrent.ThreadPoolExecutor;
24 import java.util.concurrent.TimeUnit;
26 public abstract class AsyncDataTreeChangeListenerBase<T extends DataObject, K extends DataTreeChangeListener> implements DataTreeChangeListener<T>, AutoCloseable {
27 private static final Logger LOG = LoggerFactory.getLogger(AsyncDataTreeChangeListenerBase.class);
29 private static final int DATATREE_CHANGE_HANDLER_THREAD_POOL_CORE_SIZE = 1;
30 private static final int DATATREE_CHANGE_HANDLER_THREAD_POOL_MAX_SIZE = 1;
31 private static final int DATATREE_CHANGE_HANDLER_THREAD_POOL_KEEP_ALIVE_TIME_SECS = 300;
32 private static final int STARTUP_LOOP_TICK = 500;
33 private static final int STARTUP_LOOP_MAX_RETRIES = 8;
35 private ListenerRegistration<K> listenerRegistration;
37 private static ThreadPoolExecutor dataTreeChangeHandlerExecutor = new ThreadPoolExecutor(
38 DATATREE_CHANGE_HANDLER_THREAD_POOL_CORE_SIZE,
39 DATATREE_CHANGE_HANDLER_THREAD_POOL_MAX_SIZE,
40 DATATREE_CHANGE_HANDLER_THREAD_POOL_KEEP_ALIVE_TIME_SECS,
42 new LinkedBlockingQueue<Runnable>());
44 protected final Class<T> clazz;
45 private final Class<K> eventClazz;
47 public AsyncDataTreeChangeListenerBase(Class<T> clazz, Class<K> eventClazz) {
48 this.clazz = Preconditions.checkNotNull(clazz, "Class can not be null!");
49 this.eventClazz = Preconditions.checkNotNull(eventClazz, "eventClazz can not be null!");
53 public void onDataTreeChanged(Collection<DataTreeModification<T>> changes) {
54 if (changes == null || changes.isEmpty()) {
58 DataTreeChangeHandler dataTreeChangeHandler = new DataTreeChangeHandler(changes);
59 dataTreeChangeHandlerExecutor.execute(dataTreeChangeHandler);
62 public void registerListener(LogicalDatastoreType dsType, final DataBroker db) {
63 final DataTreeIdentifier<T> treeId = new DataTreeIdentifier<>(dsType, getWildCardPath());
65 TaskRetryLooper looper = new TaskRetryLooper(STARTUP_LOOP_TICK, STARTUP_LOOP_MAX_RETRIES);
66 listenerRegistration = looper.loopUntilNoException(new Callable<ListenerRegistration<K>>() {
68 public ListenerRegistration<K> call() throws Exception {
69 return db.registerDataTreeChangeListener(treeId, getDataTreeChangeListener());
72 } catch (final Exception e) {
73 LOG.warn("{}: Data Tree Change listener registration failed.", eventClazz.getName());
74 LOG.debug("{}: Data Tree Change listener registration failed: {}", eventClazz.getName(), e);
75 throw new IllegalStateException( eventClazz.getName() + "{}startup failed. System needs restart.", e);
80 public void close() throws Exception {
81 if (listenerRegistration != null) {
83 listenerRegistration.close();
84 } catch (final Exception e) {
85 LOG.error("Error when cleaning up DataTreeChangeListener.", e);
87 listenerRegistration = null;
91 protected abstract InstanceIdentifier<T> getWildCardPath();
92 protected abstract void remove(InstanceIdentifier<T> key, T dataObjectModification);
93 protected abstract void update(InstanceIdentifier<T> key, T dataObjectModificationBefore, T dataObjectModificationAfter);
94 protected abstract void add(InstanceIdentifier<T> key, T dataObjectModification);
95 protected abstract K getDataTreeChangeListener();
97 public class DataTreeChangeHandler implements Runnable {
98 Collection<DataTreeModification<T>> changes;
100 public DataTreeChangeHandler(Collection<DataTreeModification<T>> changes) {
101 this.changes = changes;
108 for (DataTreeModification<T> change : changes) {
109 final InstanceIdentifier<T> key = change.getRootPath().getRootIdentifier();
110 final DataObjectModification<T> mod = change.getRootNode();
112 switch (mod.getModificationType()) {
114 remove(key, mod.getDataBefore());
116 case SUBTREE_MODIFIED:
117 update(key, mod.getDataBefore(), mod.getDataAfter());
120 if (mod.getDataBefore() == null) {
121 add(key, mod.getDataAfter());
123 update(key, mod.getDataBefore(), mod.getDataAfter());
127 // FIXME: May be not a good idea to throw.
128 throw new IllegalArgumentException("Unhandled modification type " + mod.getModificationType());