2 * Copyright (c) 2015 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.vpnservice.datastoreutils;
10 import com.google.common.base.Optional;
11 import com.google.common.base.Preconditions;
12 import org.opendaylight.controller.md.sal.binding.api.*;
13 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
14 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
15 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
16 import org.opendaylight.yangtools.concepts.ListenerRegistration;
17 import org.opendaylight.yangtools.yang.binding.DataObject;
18 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
22 import java.util.Collections;
25 import java.util.concurrent.Callable;
26 import java.util.concurrent.LinkedBlockingQueue;
27 import java.util.concurrent.ThreadPoolExecutor;
28 import java.util.concurrent.TimeUnit;
30 public abstract class AsyncDataChangeListenerBase<T extends DataObject, K extends DataChangeListener> implements DataChangeListener, AutoCloseable {
31 private static final Logger LOG = LoggerFactory.getLogger(AsyncDataChangeListenerBase.class);
33 private static final int DATATREE_CHANGE_HANDLER_THREAD_POOL_CORE_SIZE = 1;
34 private static final int DATATREE_CHANGE_HANDLER_THREAD_POOL_MAX_SIZE = 1;
35 private static final int DATATREE_CHANGE_HANDLER_THREAD_POOL_KEEP_ALIVE_TIME_SECS = 300;
36 private static final int STARTUP_LOOP_TICK = 500;
37 private static final int STARTUP_LOOP_MAX_RETRIES = 8;
39 private static ThreadPoolExecutor dataChangeHandlerExecutor = new ThreadPoolExecutor(
40 DATATREE_CHANGE_HANDLER_THREAD_POOL_CORE_SIZE,
41 DATATREE_CHANGE_HANDLER_THREAD_POOL_MAX_SIZE,
42 DATATREE_CHANGE_HANDLER_THREAD_POOL_KEEP_ALIVE_TIME_SECS,
44 new LinkedBlockingQueue<Runnable>());
46 private ListenerRegistration<K> listenerRegistration;
47 protected final Class<T> clazz;
48 private final Class<K> eventClazz;
51 * @param clazz - for which the data change event is received
53 public AsyncDataChangeListenerBase(Class<T> clazz, Class<K> eventClazz) {
54 this.clazz = Preconditions.checkNotNull(clazz, "Class can not be null!");
55 this.eventClazz = Preconditions.checkNotNull(eventClazz, "eventClazz can not be null!");
58 public void registerListener(final LogicalDatastoreType dsType, final DataBroker db) {
60 TaskRetryLooper looper = new TaskRetryLooper(STARTUP_LOOP_TICK, STARTUP_LOOP_MAX_RETRIES);
61 listenerRegistration = looper.loopUntilNoException(new Callable<ListenerRegistration<K>>() {
63 public ListenerRegistration call() throws Exception {
64 return db.registerDataChangeListener(dsType, getWildCardPath(), getDataChangeListener(), getDataChangeScope());
67 } catch (final Exception e) {
68 LOG.warn("{}: Data Tree Change listener registration failed.", eventClazz.getName());
69 LOG.debug("{}: Data Tree Change listener registration failed: {}", eventClazz.getName(), e);
70 throw new IllegalStateException( eventClazz.getName() + "{}startup failed. System needs restart.", e);
75 public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changeEvent) {
76 if (changeEvent == null) {
80 DataChangeHandler dataChangeHandler = new DataChangeHandler(changeEvent);
81 dataChangeHandlerExecutor.execute(dataChangeHandler);
84 @SuppressWarnings("unchecked")
85 private void createData(final Map<InstanceIdentifier<?>, DataObject> createdData) {
86 final Set<InstanceIdentifier<?>> keys = createdData.keySet() != null
87 ? createdData.keySet() : Collections.<InstanceIdentifier<?>>emptySet();
88 for (InstanceIdentifier<?> key : keys) {
89 if (clazz.equals(key.getTargetType())) {
90 InstanceIdentifier<T> createKeyIdent = key.firstIdentifierOf(clazz);
91 final Optional<DataObject> value = Optional.of(createdData.get(key));
92 if (value.isPresent()) {
93 this.add(createKeyIdent, (T)value.get());
99 @SuppressWarnings("unchecked")
100 private void updateData(final Map<InstanceIdentifier<?>, DataObject> updateData,
101 final Map<InstanceIdentifier<?>, DataObject> originalData) {
103 final Set<InstanceIdentifier<?>> keys = updateData.keySet() != null
104 ? updateData.keySet() : Collections.<InstanceIdentifier<?>>emptySet();
105 for (InstanceIdentifier<?> key : keys) {
106 if (clazz.equals(key.getTargetType())) {
107 InstanceIdentifier<T> updateKeyIdent = key.firstIdentifierOf(clazz);
108 final Optional<DataObject> value = Optional.of(updateData.get(key));
109 final Optional<DataObject> original = Optional.of(originalData.get(key));
110 if (value.isPresent() && original.isPresent()) {
111 this.update(updateKeyIdent, (T) original.get(), (T) value.get());
117 @SuppressWarnings("unchecked")
118 private void removeData(final Set<InstanceIdentifier<?>> removeData,
119 final Map<InstanceIdentifier<?>, DataObject> originalData) {
121 for (InstanceIdentifier<?> key : removeData) {
122 if (clazz.equals(key.getTargetType())) {
123 final InstanceIdentifier<T> ident = key.firstIdentifierOf(clazz);
124 final DataObject removeValue = originalData.get(key);
125 this.remove(ident, (T)removeValue);
131 public void close() throws Exception {
132 if (listenerRegistration != null) {
134 listenerRegistration.close();
135 } catch (final Exception e) {
136 LOG.error("Error when cleaning up DataChangeListener.", e);
138 listenerRegistration = null;
140 LOG.info("Interface Manager Closed");
143 protected abstract void remove(InstanceIdentifier<T> identifier, T del);
145 protected abstract void update(InstanceIdentifier<T> identifier, T original, T update);
147 protected abstract void add(InstanceIdentifier<T> identifier, T add);
149 protected abstract InstanceIdentifier<T> getWildCardPath();
151 protected abstract DataChangeListener getDataChangeListener();
153 protected abstract AsyncDataBroker.DataChangeScope getDataChangeScope();
155 public class DataChangeHandler implements Runnable {
156 final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changeEvent;
158 public DataChangeHandler(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changeEvent) {
159 this.changeEvent = changeEvent;
164 Preconditions.checkNotNull(changeEvent,"Async ChangeEvent can not be null!");
166 /* All DataObjects for create */
167 final Map<InstanceIdentifier<?>, DataObject> createdData = changeEvent.getCreatedData() != null
168 ? changeEvent.getCreatedData() : Collections.<InstanceIdentifier<?>, DataObject>emptyMap();
169 /* All DataObjects for remove */
170 final Set<InstanceIdentifier<?>> removeData = changeEvent.getRemovedPaths() != null
171 ? changeEvent.getRemovedPaths() : Collections.<InstanceIdentifier<?>>emptySet();
172 /* All DataObjects for updates */
173 final Map<InstanceIdentifier<?>, DataObject> updateData = changeEvent.getUpdatedData() != null
174 ? changeEvent.getUpdatedData() : Collections.<InstanceIdentifier<?>, DataObject>emptyMap();
175 /* All Original DataObjects */
176 final Map<InstanceIdentifier<?>, DataObject> originalData = changeEvent.getOriginalData() != null
177 ? changeEvent.getOriginalData() : Collections.<InstanceIdentifier<?>, DataObject>emptyMap();
179 createData(createdData);
180 updateData(updateData, originalData);
181 removeData(removeData, originalData);