Split out NetconfMapperAggregator
[netconf.git] / protocol / netconf-impl / src / main / java / org / opendaylight / netconf / impl / osgi / AggregatedNetconfOperationServiceFactory.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.impl.osgi;
9
10 import com.google.common.collect.HashMultimap;
11 import com.google.common.collect.ImmutableSet;
12 import com.google.common.collect.Multimap;
13 import com.google.common.collect.Multimaps;
14 import java.util.HashMap;
15 import java.util.HashSet;
16 import java.util.List;
17 import java.util.Map;
18 import java.util.Set;
19 import java.util.concurrent.ConcurrentHashMap;
20 import org.opendaylight.netconf.api.capability.Capability;
21 import org.opendaylight.netconf.api.monitoring.CapabilityListener;
22 import org.opendaylight.netconf.mapping.api.NetconfOperation;
23 import org.opendaylight.netconf.mapping.api.NetconfOperationService;
24 import org.opendaylight.netconf.mapping.api.NetconfOperationServiceFactory;
25 import org.opendaylight.netconf.mapping.api.NetconfOperationServiceFactoryListener;
26 import org.opendaylight.netconf.util.CloseableUtil;
27 import org.opendaylight.yangtools.concepts.AbstractRegistration;
28 import org.opendaylight.yangtools.concepts.Registration;
29
30 /**
31  * NetconfOperationService aggregator. Makes a collection of operation services accessible as one.
32  */
33 // Non-final for OSGi DS
34 // FIXME: split into abstract class for the multiple uses we have, which really is about which construct is being
35 //        invoked
36 public class AggregatedNetconfOperationServiceFactory
37         implements NetconfOperationServiceFactory, NetconfOperationServiceFactoryListener, AutoCloseable {
38     private final Set<NetconfOperationServiceFactory> factories = ConcurrentHashMap.newKeySet();
39     private final Multimap<NetconfOperationServiceFactory, Registration> registrations =
40             Multimaps.synchronizedMultimap(HashMultimap.create());
41     private final Set<CapabilityListener> listeners = ConcurrentHashMap.newKeySet();
42
43     public AggregatedNetconfOperationServiceFactory() {
44     }
45
46     public AggregatedNetconfOperationServiceFactory(final List<NetconfOperationServiceFactory> mappers) {
47         mappers.forEach(this::onAddNetconfOperationServiceFactory);
48     }
49
50     @Override
51     public final synchronized void onAddNetconfOperationServiceFactory(final NetconfOperationServiceFactory service) {
52         factories.add(service);
53
54         for (final CapabilityListener listener : listeners) {
55             registrations.put(service, service.registerCapabilityListener(listener));
56         }
57     }
58
59     @Override
60     public final synchronized void onRemoveNetconfOperationServiceFactory(
61             final NetconfOperationServiceFactory service) {
62         factories.remove(service);
63         registrations.removeAll(service).forEach(Registration::close);
64     }
65
66     @Override
67     public final Set<Capability> getCapabilities() {
68         final Set<Capability> capabilities = new HashSet<>();
69         for (final NetconfOperationServiceFactory factory : factories) {
70             capabilities.addAll(factory.getCapabilities());
71         }
72         return capabilities;
73     }
74
75     @Override
76     public final synchronized Registration registerCapabilityListener(final CapabilityListener listener) {
77         final Map<NetconfOperationServiceFactory, Registration> regs = new HashMap<>();
78
79         for (final NetconfOperationServiceFactory factory : factories) {
80             regs.put(factory, factory.registerCapabilityListener(listener));
81         }
82         listeners.add(listener);
83
84         return new AbstractRegistration() {
85
86             @Override
87             protected void removeRegistration() {
88                 synchronized (AggregatedNetconfOperationServiceFactory.this) {
89                     listeners.remove(listener);
90                     regs.values().forEach(Registration::close);
91                     for (var reg : regs.entrySet()) {
92                         registrations.remove(reg.getKey(), reg.getValue());
93                     }
94                 }
95             }
96         };
97     }
98
99     @Override
100     public final synchronized NetconfOperationService createService(final String netconfSessionIdForReporting) {
101         return new AggregatedNetconfOperation(factories, netconfSessionIdForReporting);
102     }
103
104     @Override
105     public synchronized void close() {
106         factories.clear();
107         registrations.values().forEach(Registration::close);
108         registrations.clear();
109         listeners.clear();
110     }
111
112     private static final class AggregatedNetconfOperation implements NetconfOperationService {
113         private final ImmutableSet<NetconfOperationService> services;
114
115         AggregatedNetconfOperation(final Set<NetconfOperationServiceFactory> factories,
116                                    final String netconfSessionIdForReporting) {
117             services = factories.stream()
118                 .map(factory -> factory.createService(netconfSessionIdForReporting))
119                 .collect(ImmutableSet.toImmutableSet());
120         }
121
122         @Override
123         public Set<NetconfOperation> getNetconfOperations() {
124             return services.stream()
125                 .flatMap(service -> service.getNetconfOperations().stream())
126                 .collect(ImmutableSet.toImmutableSet());
127         }
128
129         @SuppressWarnings("checkstyle:IllegalCatch")
130         @Override
131         public void close() {
132             try {
133                 CloseableUtil.closeAll(services);
134             } catch (final Exception e) {
135                 throw new IllegalStateException("Unable to properly close all aggregated services", e);
136             }
137         }
138     }
139 }