/* * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.netconf.impl.osgi; import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet.Builder; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; import com.google.common.collect.Sets; import io.netty.util.internal.ConcurrentSet; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import org.opendaylight.netconf.api.capability.Capability; import org.opendaylight.netconf.api.monitoring.CapabilityListener; import org.opendaylight.netconf.mapping.api.NetconfOperation; import org.opendaylight.netconf.mapping.api.NetconfOperationService; import org.opendaylight.netconf.mapping.api.NetconfOperationServiceFactory; import org.opendaylight.netconf.mapping.api.NetconfOperationServiceFactoryListener; import org.opendaylight.netconf.util.CloseableUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * NetconfOperationService aggregator. Makes a collection of operation services accessible as one. */ public class AggregatedNetconfOperationServiceFactory implements NetconfOperationServiceFactory, NetconfOperationServiceFactoryListener, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(AggregatedNetconfOperationServiceFactory.class); private final Set factories = new ConcurrentSet<>(); private final Multimap registrations = Multimaps.synchronizedMultimap(HashMultimap.create()); private final Set listeners = new ConcurrentSet<>(); public AggregatedNetconfOperationServiceFactory() { } public AggregatedNetconfOperationServiceFactory(final List mappers) { mappers.forEach(this::onAddNetconfOperationServiceFactory); } @Override public synchronized void onAddNetconfOperationServiceFactory(NetconfOperationServiceFactory service) { factories.add(service); for (final CapabilityListener listener : listeners) { AutoCloseable reg = service.registerCapabilityListener(listener); registrations.put(service, reg); } } @SuppressWarnings("checkstyle:IllegalCatch") @Override public synchronized void onRemoveNetconfOperationServiceFactory(NetconfOperationServiceFactory service) { factories.remove(service); for (final AutoCloseable autoCloseable : registrations.get(service)) { try { autoCloseable.close(); } catch (Exception e) { LOG.warn("Unable to close listener registration", e); } } registrations.removeAll(service); } @Override public Set getCapabilities() { final HashSet capabilities = Sets.newHashSet(); for (final NetconfOperationServiceFactory factory : factories) { capabilities.addAll(factory.getCapabilities()); } return capabilities; } @Override public synchronized AutoCloseable registerCapabilityListener(final CapabilityListener listener) { final Map regs = Maps.newHashMap(); for (final NetconfOperationServiceFactory factory : factories) { final AutoCloseable reg = factory.registerCapabilityListener(listener); regs.put(factory, reg); } listeners.add(listener); return () -> { synchronized (AggregatedNetconfOperationServiceFactory.this) { listeners.remove(listener); CloseableUtil.closeAll(regs.values()); for (final Map.Entry reg : regs.entrySet()) { registrations.remove(reg.getKey(), reg.getValue()); } } }; } @Override public synchronized NetconfOperationService createService(final String netconfSessionIdForReporting) { return new AggregatedNetconfOperation(factories, netconfSessionIdForReporting); } @Override public synchronized void close() throws Exception { factories.clear(); for (AutoCloseable reg : registrations.values()) { reg.close(); } registrations.clear(); listeners.clear(); } private static final class AggregatedNetconfOperation implements NetconfOperationService { private final Set services; AggregatedNetconfOperation(final Set factories, final String netconfSessionIdForReporting) { final Builder b = ImmutableSet.builder(); for (final NetconfOperationServiceFactory factory : factories) { b.add(factory.createService(netconfSessionIdForReporting)); } this.services = b.build(); } @Override public Set getNetconfOperations() { final HashSet operations = Sets.newHashSet(); for (final NetconfOperationService service : services) { operations.addAll(service.getNetconfOperations()); } return operations; } @SuppressWarnings("checkstyle:IllegalCatch") @Override public void close() { try { CloseableUtil.closeAll(services); } catch (final Exception e) { throw new IllegalStateException("Unable to properly close all aggregated services", e); } } } }