/* * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.mdsal.dom.store.inmemory; import com.google.common.collect.ImmutableList; import java.util.Collection; import java.util.Map; import java.util.concurrent.Executor; import javax.annotation.Nonnull; import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration; import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager; import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager.Invoker; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; import org.slf4j.Logger; import org.slf4j.LoggerFactory; final class InMemoryDOMDataTreeShardChangePublisher extends AbstractDOMShardTreeChangePublisher { private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataTreeShardChangePublisher.class); private static final Invoker, DataTreeCandidate> MANAGER_INVOKER = (listener, notification) -> { final DOMDataTreeChangeListener inst = listener.getInstance(); if (inst != null) { inst.onDataTreeChanged(ImmutableList.of(notification)); } }; private final QueuedNotificationManager, DataTreeCandidate> notificationManager; InMemoryDOMDataTreeShardChangePublisher(final Executor executor, final int maxQueueSize, final DataTree dataTree, final YangInstanceIdentifier rootPath, final Map childShards) { super(dataTree, rootPath, childShards); notificationManager = new QueuedNotificationManager<>( executor, MANAGER_INVOKER, maxQueueSize, "DataTreeChangeListenerQueueMgr"); } @Override protected void notifyListeners( @Nonnull final Collection> registrations, @Nonnull final YangInstanceIdentifier path, @Nonnull final DataTreeCandidateNode node) { final DataTreeCandidate candidate = DataTreeCandidates.newDataTreeCandidate(path, node); for (final AbstractDOMDataTreeChangeListenerRegistration reg : registrations) { LOG.debug("Enqueueing candidate {} to registration {}", candidate, registrations); notificationManager.submitNotification(reg, candidate); } } @Override protected void registrationRemoved(@Nonnull final AbstractDOMDataTreeChangeListenerRegistration registration) { LOG.debug("Closing registration {}", registration); } @Override public AbstractDOMDataTreeChangeListenerRegistration registerTreeChangeListener(final YangInstanceIdentifier path, final L listener) { return super.registerTreeChangeListener(path, listener); } synchronized void publishChange(@Nonnull final DataTreeCandidate candidate) { processCandidateTree(candidate); } }