/* * Copyright (c) 2015 Ericsson India Global Services Pvt Ltd. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.vpnservice.datastoreutils; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import org.opendaylight.controller.md.sal.binding.api.*; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public abstract class AsyncClusteredDataChangeListenerBase implements ClusteredDataChangeListener, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(AsyncClusteredDataChangeListenerBase.class); private static final int DATATREE_CHANGE_HANDLER_THREAD_POOL_CORE_SIZE = 1; private static final int DATATREE_CHANGE_HANDLER_THREAD_POOL_MAX_SIZE = 1; private static final int DATATREE_CHANGE_HANDLER_THREAD_POOL_KEEP_ALIVE_TIME_SECS = 300; private static final int STARTUP_LOOP_TICK = 500; private static final int STARTUP_LOOP_MAX_RETRIES = 8; private static ThreadPoolExecutor dataChangeHandlerExecutor = new ThreadPoolExecutor( DATATREE_CHANGE_HANDLER_THREAD_POOL_CORE_SIZE, DATATREE_CHANGE_HANDLER_THREAD_POOL_MAX_SIZE, DATATREE_CHANGE_HANDLER_THREAD_POOL_KEEP_ALIVE_TIME_SECS, TimeUnit.SECONDS, new LinkedBlockingQueue()); private ListenerRegistration listenerRegistration; protected final Class clazz; private final Class eventClazz; /** * @param clazz - for which the data change event is received */ public AsyncClusteredDataChangeListenerBase(Class clazz, Class eventClazz) { this.clazz = Preconditions.checkNotNull(clazz, "Class can not be null!"); this.eventClazz = Preconditions.checkNotNull(eventClazz, "eventClazz can not be null!"); } public void registerListener(final LogicalDatastoreType dsType, final DataBroker db) { try { TaskRetryLooper looper = new TaskRetryLooper(STARTUP_LOOP_TICK, STARTUP_LOOP_MAX_RETRIES); listenerRegistration = looper.loopUntilNoException(new Callable>() { @Override public ListenerRegistration call() throws Exception { return db.registerDataChangeListener(dsType, getWildCardPath(), getDataChangeListener(), getDataChangeScope()); } }); } catch (final Exception e) { LOG.warn("{}: Data Tree Change listener registration failed.", eventClazz.getName()); LOG.debug("{}: Data Tree Change listener registration failed: {}", eventClazz.getName(), e); throw new IllegalStateException( eventClazz.getName() + "{}startup failed. System needs restart.", e); } } @Override public void onDataChanged(final AsyncDataChangeEvent, DataObject> changeEvent) { if (changeEvent == null) { return; } DataChangeHandler dataChangeHandler = new DataChangeHandler(changeEvent); dataChangeHandlerExecutor.execute(dataChangeHandler); } @SuppressWarnings("unchecked") private void createData(final Map, DataObject> createdData) { final Set> keys = createdData.keySet() != null ? createdData.keySet() : Collections.>emptySet(); for (InstanceIdentifier key : keys) { if (clazz.equals(key.getTargetType())) { InstanceIdentifier createKeyIdent = key.firstIdentifierOf(clazz); final Optional value = Optional.of(createdData.get(key)); if (value.isPresent()) { this.add(createKeyIdent, (T)value.get()); } } } } @SuppressWarnings("unchecked") private void updateData(final Map, DataObject> updateData, final Map, DataObject> originalData) { final Set> keys = updateData.keySet() != null ? updateData.keySet() : Collections.>emptySet(); for (InstanceIdentifier key : keys) { if (clazz.equals(key.getTargetType())) { InstanceIdentifier updateKeyIdent = key.firstIdentifierOf(clazz); final Optional value = Optional.of(updateData.get(key)); final Optional original = Optional.of(originalData.get(key)); if (value.isPresent() && original.isPresent()) { this.update(updateKeyIdent, (T) original.get(), (T) value.get()); } } } } @SuppressWarnings("unchecked") private void removeData(final Set> removeData, final Map, DataObject> originalData) { for (InstanceIdentifier key : removeData) { if (clazz.equals(key.getTargetType())) { final InstanceIdentifier ident = key.firstIdentifierOf(clazz); final DataObject removeValue = originalData.get(key); this.remove(ident, (T)removeValue); } } } @Override public void close() throws Exception { if (listenerRegistration != null) { try { listenerRegistration.close(); } catch (final Exception e) { LOG.error("Error when cleaning up ClusteredDataChangeListener.", e); } listenerRegistration = null; } LOG.info("Interface Manager Closed"); } protected abstract void remove(InstanceIdentifier identifier, T del); protected abstract void update(InstanceIdentifier identifier, T original, T update); protected abstract void add(InstanceIdentifier identifier, T add); protected abstract InstanceIdentifier getWildCardPath(); protected abstract ClusteredDataChangeListener getDataChangeListener(); protected abstract AsyncDataBroker.DataChangeScope getDataChangeScope(); public class DataChangeHandler implements Runnable { final AsyncDataChangeEvent, DataObject> changeEvent; public DataChangeHandler(final AsyncDataChangeEvent, DataObject> changeEvent) { this.changeEvent = changeEvent; } @Override public void run() { Preconditions.checkNotNull(changeEvent,"Async ChangeEvent can not be null!"); /* All DataObjects for create */ final Map, DataObject> createdData = changeEvent.getCreatedData() != null ? changeEvent.getCreatedData() : Collections., DataObject>emptyMap(); /* All DataObjects for remove */ final Set> removeData = changeEvent.getRemovedPaths() != null ? changeEvent.getRemovedPaths() : Collections.>emptySet(); /* All DataObjects for updates */ final Map, DataObject> updateData = changeEvent.getUpdatedData() != null ? changeEvent.getUpdatedData() : Collections., DataObject>emptyMap(); /* All Original DataObjects */ final Map, DataObject> originalData = changeEvent.getOriginalData() != null ? changeEvent.getOriginalData() : Collections., DataObject>emptyMap(); createData(createdData); updateData(updateData, originalData); removeData(removeData, originalData); } } }