Shared Endpoint RPC for multiple render
[groupbasedpolicy.git] / groupbasedpolicy / src / main / java / org / opendaylight / groupbasedpolicy / endpoint / EndpointRpcRegistry.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.groupbasedpolicy.endpoint;
10
11 import java.util.Map.Entry;
12 import java.util.concurrent.ConcurrentHashMap;
13 import java.util.concurrent.ConcurrentMap;
14 import java.util.concurrent.Future;
15 import java.util.concurrent.ScheduledExecutorService;
16
17 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
18 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
19 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
20 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
21 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
22 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.EndpointService;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.Endpoints;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.EndpointsBuilder;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.RegisterEndpointInput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.SetEndpointGroupConditionsInput;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.UnregisterEndpointInput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.UnsetEndpointGroupConditionsInput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoint.fields.L3Address;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.ConditionMapping;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.ConditionMappingKey;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.Endpoint;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointBuilder;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointKey;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3Builder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3Key;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.has.endpoint.group.conditions.EndpointGroupCondition;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.has.endpoint.group.conditions.EndpointGroupConditionKey;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.unregister.endpoint.input.L2;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.unregister.endpoint.input.L3;
43 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
44 import org.opendaylight.yangtools.yang.common.RpcResult;
45 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 import com.google.common.base.Function;
50 import com.google.common.util.concurrent.CheckedFuture;
51 import com.google.common.util.concurrent.FutureCallback;
52 import com.google.common.util.concurrent.Futures;
53 import com.google.common.util.concurrent.ListenableFuture;
54
55 /**
56  * Endpoint registry provides a scalable store for accessing and 
57  * updating information about endpoints.
58  * @author readamsO
59  */
60 public class EndpointRpcRegistry implements EndpointService {
61     private static final Logger LOG = 
62             LoggerFactory.getLogger(EndpointRpcRegistry.class);
63
64     private final DataBroker dataProvider;
65     private final ScheduledExecutorService executor;
66     private final RpcProviderRegistry rpcRegistry;
67     private static EndpointRpcRegistry endpointRpcRegistry;
68
69     final BindingAwareBroker.RpcRegistration<EndpointService> rpcRegistration;
70
71     private final static ConcurrentMap<String, EpRendererAugmentation> registeredRenderers = new ConcurrentHashMap<String, EpRendererAugmentation>();
72
73
74     /**
75      * This method registers a renderer for endpoint RPC API. This method
76      * ensures single RPC registration for all renderers since a single RPC
77      * registration is only allowed.
78      *
79      * @param dataProvider
80      *            - the dataProvider
81      * @param rpcRegistry
82      *            - the rpcRegistry
83      * @param executor
84      *            - thread pool executor
85      * @param epRendererAugmentation
86      *            - specific implementation RPC augmentation, if any. Otherwise NULL
87      */
88     public static void register(DataBroker dataProvider,
89             RpcProviderRegistry rpcRegistry, ScheduledExecutorService executor,
90             EpRendererAugmentation epRendererAugmentation) {
91         if (dataProvider == null || rpcRegistry == null || executor == null) {
92             if (epRendererAugmentation != null) {
93                 LOG.warn("Couldn't register class {} for endpoint RPC because of missing required info");
94             }
95             return;
96         }
97         if (endpointRpcRegistry == null) {
98             synchronized (EndpointRpcRegistry.class) {
99                 if (endpointRpcRegistry == null) {
100                     endpointRpcRegistry = new EndpointRpcRegistry(dataProvider,
101                             rpcRegistry, executor);
102                 }
103             }
104         }
105         if (epRendererAugmentation != null) {
106             registeredRenderers.putIfAbsent(epRendererAugmentation.getClass()
107                     .getName(), epRendererAugmentation);
108         }
109     }
110
111     /**
112      *
113      * @param regImp
114      * @throws Exception
115      */
116     public static void unregister(EpRendererAugmentation regImp)
117             throws Exception {
118         if (regImp == null
119                 || !registeredRenderers
120                         .containsKey(regImp.getClass().getName())) {
121             return;
122         }
123         registeredRenderers.remove(regImp.getClass().getName());
124         LOG.info("Unregistered {}", regImp.getClass().getName());
125         if (registeredRenderers.isEmpty() && endpointRpcRegistry != null) {
126             synchronized (EndpointRpcRegistry.class) {
127                 if (registeredRenderers.isEmpty()
128                         && endpointRpcRegistry != null) {
129                     endpointRpcRegistry.rpcRegistration.close();
130                     endpointRpcRegistry = null;
131                 }
132             }
133         }
134     }
135
136     /**
137      * Constructor
138      *
139      * @param dataProvider
140      * @param rpcRegistry
141      * @param executor
142      */
143     private EndpointRpcRegistry(DataBroker dataProvider,
144                                     RpcProviderRegistry rpcRegistry,
145                                     ScheduledExecutorService executor) {
146         this.dataProvider = dataProvider;
147         this.executor = executor;
148         this.rpcRegistry = rpcRegistry;
149
150         if (this.rpcRegistry != null) {
151             rpcRegistration =
152                     this.rpcRegistry.addRpcImplementation(EndpointService.class, this);
153             LOG.debug("Added RPC Implementation Correctly");
154         } else
155             rpcRegistration = null;
156         
157         if (dataProvider != null) {
158             // XXX - This is a hack to avoid a bug in the data broker
159             // API where you have to write all the parents before you can write
160             // a child
161             InstanceIdentifier<Endpoints> iid = 
162                     InstanceIdentifier.builder(Endpoints.class).build();
163             WriteTransaction t = this.dataProvider.newWriteOnlyTransaction();
164             t.put(LogicalDatastoreType.OPERATIONAL, 
165                   iid, new EndpointsBuilder().build());
166             CheckedFuture<Void, TransactionCommitFailedException> f = t.submit();
167             Futures.addCallback(f, new FutureCallback<Void>() {
168                 @Override
169                 public void onFailure(Throwable t) {
170                     LOG.error("Could not write endpoint base container", t);
171                 }
172
173                 @Override
174                 public void onSuccess(Void result) {
175                     // TODO Auto-generated method stub
176                     
177                 }
178             });
179         }
180
181         // XXX TODO - age out endpoint data and remove 
182         // endpoint group/condition mappings with no conditions
183     }
184   
185     /**
186      * Construct an endpoint with the appropriate augmentations from the 
187      * endpoint input.  Each concrete implementation can provides its specifics earlier.
188      * @param input the input object
189      */
190     private EndpointBuilder buildEndpoint(RegisterEndpointInput input) {
191         EndpointBuilder eb = new EndpointBuilder(input);
192         for (Entry<String, EpRendererAugmentation> entry : registeredRenderers
193                 .entrySet()) {
194             try {
195                 entry.getValue().buildEndpointAugmentation(eb, input);
196             } catch (Throwable t) {
197                 LOG.warn("Endpoint Augmentation error while processing "
198                         + entry.getKey() + ". Reason: ", t);
199             }
200         }
201         return eb;
202     }
203
204     /**
205      * Construct an L3 endpoint with the appropriate augmentations from the 
206      * endpoint input.  Each concrete implementation can provides its specifics earlier.
207      * @param input the input object
208      */
209     private EndpointL3Builder buildEndpointL3(RegisterEndpointInput input) {
210         EndpointL3Builder eb = new EndpointL3Builder(input);
211         for (Entry<String, EpRendererAugmentation> entry : registeredRenderers
212                 .entrySet()) {
213             try {
214                 entry.getValue().buildEndpointL3Augmentation(eb, input);
215             } catch (Throwable t) {
216                 LOG.warn("L3 endpoint Augmentation error while processing "
217                         + entry.getKey() + ". Reason: ", t);
218             }
219         }
220         return eb;
221     }
222
223     @Override
224     public Future<RpcResult<Void>>
225         registerEndpoint(RegisterEndpointInput input) {
226         long timestamp = System.currentTimeMillis();
227         
228         WriteTransaction t = dataProvider.newWriteOnlyTransaction();
229
230         if (input.getL2Context() != null &&
231             input.getMacAddress() != null) {
232             Endpoint ep = buildEndpoint(input)
233                     .setTimestamp(timestamp)
234                     .build();
235
236             EndpointKey key = 
237                     new EndpointKey(ep.getL2Context(), ep.getMacAddress());
238             InstanceIdentifier<Endpoint> iid = 
239                     InstanceIdentifier.builder(Endpoints.class)
240                     .child(Endpoint.class, key)
241                     .build();
242             t.put(LogicalDatastoreType.OPERATIONAL, iid, ep);
243         }
244         if (input.getL3Address() != null) {
245             for (L3Address l3addr : input.getL3Address()) {
246                 EndpointL3Key key3 = new EndpointL3Key(l3addr.getIpAddress(), 
247                                                        l3addr.getL3Context());
248                 EndpointL3 ep3 = buildEndpointL3(input)
249                     .setIpAddress(key3.getIpAddress())
250                     .setL3Context(key3.getL3Context())
251                     .setTimestamp(timestamp)
252                     .build();
253                 InstanceIdentifier<EndpointL3> iid_l3 = 
254                         InstanceIdentifier.builder(Endpoints.class)
255                             .child(EndpointL3.class, key3)
256                             .build();
257                 t.put(LogicalDatastoreType.OPERATIONAL, iid_l3, ep3);
258             }
259         }
260         ListenableFuture<Void> r = t.submit();
261         return Futures.transform(r, futureTrans, executor);
262     }
263
264     @Override
265     public Future<RpcResult<Void>>
266         unregisterEndpoint(UnregisterEndpointInput input) {
267         WriteTransaction t = dataProvider.newWriteOnlyTransaction();
268         if (input.getL2() != null) {
269             for (L2 l2a : input.getL2()) {
270                 EndpointKey key = 
271                         new EndpointKey(l2a.getL2Context(), 
272                                         l2a.getMacAddress());
273                 InstanceIdentifier<Endpoint> iid = 
274                         InstanceIdentifier.builder(Endpoints.class)
275                         .child(Endpoint.class, key).build();
276                 t.delete(LogicalDatastoreType.OPERATIONAL, iid);
277             }
278         }
279         if (input.getL3() != null) {
280             for (L3 l3addr : input.getL3()) {
281                 EndpointL3Key key3 = 
282                         new EndpointL3Key(l3addr.getIpAddress(), 
283                                           l3addr.getL3Context());
284                 InstanceIdentifier<EndpointL3> iid_l3 = 
285                         InstanceIdentifier.builder(Endpoints.class)
286                         .child(EndpointL3.class, key3)
287                         .build();
288                 t.delete(LogicalDatastoreType.OPERATIONAL, iid_l3);
289             }
290         }
291
292         ListenableFuture<Void> r = t.submit();
293         return Futures.transform(r, futureTrans, executor);
294     }
295
296     @Override
297     public Future<RpcResult<Void>> 
298         setEndpointGroupConditions(SetEndpointGroupConditionsInput input) {
299         WriteTransaction t = dataProvider.newWriteOnlyTransaction();
300
301         ConditionMappingKey key = 
302                 new ConditionMappingKey(input.getEndpointGroup());
303         
304         for (EndpointGroupCondition condition: input.getEndpointGroupCondition()) {
305             EndpointGroupConditionKey ckey = 
306                     new EndpointGroupConditionKey(condition.getCondition());
307             InstanceIdentifier<EndpointGroupCondition> iid = 
308                     InstanceIdentifier.builder(Endpoints.class)
309                         .child(ConditionMapping.class, key)
310                         .child(EndpointGroupCondition.class, ckey)
311                         .build();
312             t.put(LogicalDatastoreType.OPERATIONAL, iid, condition);
313         }
314
315         ListenableFuture<Void> r = t.submit();
316         return Futures.transform(r, futureTrans, executor);
317     }
318
319     @Override
320     public Future<RpcResult<Void>> 
321         unsetEndpointGroupConditions(UnsetEndpointGroupConditionsInput input) {
322         WriteTransaction t = dataProvider.newWriteOnlyTransaction();
323
324         ConditionMappingKey key = 
325                 new ConditionMappingKey(input.getEndpointGroup());
326         
327         for (EndpointGroupCondition condition: input.getEndpointGroupCondition()) {
328             EndpointGroupConditionKey ckey = 
329                     new EndpointGroupConditionKey(condition.getCondition());
330             InstanceIdentifier<EndpointGroupCondition> iid = 
331                     InstanceIdentifier.builder(Endpoints.class)
332                         .child(ConditionMapping.class, key)
333                         .child(EndpointGroupCondition.class, ckey)
334                         .build();
335
336             t.delete(LogicalDatastoreType.OPERATIONAL, iid);
337         }
338
339         ListenableFuture<Void> r = t.submit();
340         return Futures.transform(r, futureTrans, executor);
341     }
342
343     Function<Void, RpcResult<Void>> futureTrans =
344             new Function<Void,RpcResult<Void>>() {
345         @Override
346         public RpcResult<Void> apply(Void input) {
347             return RpcResultBuilder.<Void>success().build();
348         }
349     };
350 }