Bug 4341: Lifecycle Management of Endpoint Registry state
[groupbasedpolicy.git] / groupbasedpolicy / src / main / java / org / opendaylight / groupbasedpolicy / endpoint / EndpointRpcRegistry.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.groupbasedpolicy.endpoint;
10
11 import java.util.Map.Entry;
12 import java.util.concurrent.ConcurrentHashMap;
13 import java.util.concurrent.ConcurrentMap;
14 import java.util.concurrent.Executors;
15 import java.util.concurrent.Future;
16 import java.util.concurrent.ScheduledExecutorService;
17
18 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
19 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
20 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
21 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
22 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
23 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.EndpointService;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.Endpoints;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.EndpointsBuilder;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.RegisterEndpointInput;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.RegisterL3PrefixEndpointInput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.SetEndpointGroupConditionsInput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.UnregisterEndpointInput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.UnsetEndpointGroupConditionsInput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoint.fields.L3Address;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.ConditionMapping;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.ConditionMappingKey;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.Endpoint;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointBuilder;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointKey;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3Builder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3Key;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3Prefix;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3PrefixBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3PrefixKey;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.has.endpoint.group.conditions.EndpointGroupCondition;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.has.endpoint.group.conditions.EndpointGroupConditionKey;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.unregister.endpoint.input.L2;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.unregister.endpoint.input.L3;
48 import org.opendaylight.yangtools.yang.binding.Augmentation;
49 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
50 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
51 import org.opendaylight.yangtools.yang.common.RpcResult;
52 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 import com.google.common.base.Function;
57 import com.google.common.util.concurrent.CheckedFuture;
58 import com.google.common.util.concurrent.FutureCallback;
59 import com.google.common.util.concurrent.Futures;
60 import com.google.common.util.concurrent.ListenableFuture;
61
62 /**
63  * Endpoint registry provides a scalable store for accessing and updating
64  * information about endpoints.
65  */
66 public class EndpointRpcRegistry implements EndpointService {
67     private static final Logger LOG =
68             LoggerFactory.getLogger(EndpointRpcRegistry.class);
69
70     private final DataBroker dataProvider;
71     private final ScheduledExecutorService executor;
72     private final RpcProviderRegistry rpcRegistry;
73     private static EndpointRpcRegistry endpointRpcRegistry;
74
75     final BindingAwareBroker.RpcRegistration<EndpointService> rpcRegistration;
76
77     private final static ConcurrentMap<String, EpRendererAugmentation> registeredRenderers = new ConcurrentHashMap<String, EpRendererAugmentation>();
78
79     /**
80      * This method registers a renderer for endpoint RPC API. This method
81      * ensures single RPC registration for all renderers since a single RPC
82      * registration is only allowed.
83      *
84      * @param dataProvider
85      *            - the dataProvider
86      * @param rpcRegistry
87      *            - the rpcRegistry
88      * @param epRendererAugmentation
89      *            - specific implementation RPC augmentation, if any. Otherwise
90      *            NULL
91      */
92     public static void register(DataBroker dataProvider,
93             RpcProviderRegistry rpcRegistry,
94             EpRendererAugmentation epRendererAugmentation) {
95         if (dataProvider == null || rpcRegistry == null) {
96             if (epRendererAugmentation != null) {
97                 LOG.warn("Couldn't register class {} for endpoint RPC because of missing required info", epRendererAugmentation);
98             }
99             return;
100         }
101         if (endpointRpcRegistry == null) {
102             synchronized (EndpointRpcRegistry.class) {
103                 if (endpointRpcRegistry == null) {
104                     ScheduledExecutorService executor = Executors
105                             .newScheduledThreadPool(Math
106                                     .max(Runtime.getRuntime()
107                                             .availableProcessors() * 3, 10));
108                     endpointRpcRegistry = new EndpointRpcRegistry(dataProvider,
109                             rpcRegistry, executor);
110                 }
111             }
112         }
113         if (epRendererAugmentation != null) {
114             registeredRenderers.putIfAbsent(epRendererAugmentation.getClass()
115                     .getName(), epRendererAugmentation);
116         }
117     }
118
119     /**
120      *
121      * @param regImp the endpoint augmentation
122      * @throws Exception
123      */
124     public static void unregister(EpRendererAugmentation regImp)
125             throws Exception {
126         if (regImp == null
127                 || !registeredRenderers
128                         .containsKey(regImp.getClass().getName())) {
129             return;
130         }
131         registeredRenderers.remove(regImp.getClass().getName());
132         LOG.info("Unregistered {}", regImp.getClass().getName());
133         if (registeredRenderers.isEmpty() && endpointRpcRegistry != null) {
134             synchronized (EndpointRpcRegistry.class) {
135                 if (registeredRenderers.isEmpty() && endpointRpcRegistry != null
136                         && endpointRpcRegistry.rpcRegistration != null) {
137                     endpointRpcRegistry.rpcRegistration.close();
138                     endpointRpcRegistry.executor.shutdown();
139                     endpointRpcRegistry = null;
140                 }
141             }
142         }
143     }
144
145     public static Class<?> getAugmentationContextType(
146             final Augmentation<?> augmentation) {
147         if (augmentation == null) {
148             return null;
149         }
150         final Class<?>[] augmentationInterfaces = augmentation.getClass()
151                 .getInterfaces();
152         if (augmentationInterfaces.length == 1) {
153             return augmentationInterfaces[0];
154         }
155         /*
156          * if here, then the way YANG tools generate augmentation code has
157          * changed, hence augmentation classes are not implemented by single
158          * interface anymore. This is very unlikely to happen, but if it did, we
159          * need to know about it in order to update this method.
160          */
161         LOG.error(
162                 "YANG Generated Code has Changed -- augmentation object {} is NOT implemented by one interface anymore",
163                 augmentation);
164         return null;
165     }
166
167     /**
168      * Constructor
169      *
170      * @param dataProvider the {@link DataBroker}
171      * @param rpcRegistry  the {@link RpcProviderRegistry}
172      * @param executor     the {@link ScheduledExecutorService}
173      */
174     private EndpointRpcRegistry(DataBroker dataProvider,
175             RpcProviderRegistry rpcRegistry,
176             ScheduledExecutorService executor) {
177         this.dataProvider = dataProvider;
178         this.executor = executor;
179         this.rpcRegistry = rpcRegistry;
180
181         if (this.rpcRegistry != null) {
182             rpcRegistration =
183                     this.rpcRegistry.addRpcImplementation(EndpointService.class, this);
184             LOG.debug("Added RPC Implementation Correctly");
185         } else
186             rpcRegistration = null;
187
188         if (dataProvider != null) {
189             InstanceIdentifier<Endpoints> iid =
190                     InstanceIdentifier.builder(Endpoints.class).build();
191             WriteTransaction t = this.dataProvider.newWriteOnlyTransaction();
192             t.put(LogicalDatastoreType.OPERATIONAL,
193                     iid, new EndpointsBuilder().build());
194             CheckedFuture<Void, TransactionCommitFailedException> f = t.submit();
195             Futures.addCallback(f, new FutureCallback<Void>() {
196                 @Override
197                 public void onFailure(Throwable t) {
198                     LOG.error("Could not write endpoint base container", t);
199                 }
200
201                 @Override
202                 public void onSuccess(Void result) {
203
204                 }
205             });
206         }
207
208         // TODO Be alagalah - age out endpoint data and remove
209         // endpoint group/condition mappings with no conditions
210     }
211
212     /**
213      * Construct an endpoint with the appropriate augmentations from the
214      * endpoint input. Each concrete implementation can provides its specifics
215      * earlier.
216      *
217      * @param input
218      *            the input object
219      */
220     private EndpointBuilder buildEndpoint(RegisterEndpointInput input) {
221         EndpointBuilder eb = new EndpointBuilder(input);
222         for (Entry<String, EpRendererAugmentation> entry : registeredRenderers
223                 .entrySet()) {
224             try {
225                 Augmentation<Endpoint> augmentation = entry.getValue()
226                         .buildEndpointAugmentation(input);
227                 if (augmentation != null) {
228                     @SuppressWarnings("unchecked")
229                     Class<? extends Augmentation<Endpoint>> augmentationType = (Class<? extends Augmentation<Endpoint>>) getAugmentationContextType(augmentation);
230                     eb.addAugmentation(augmentationType, augmentation);
231                 }
232             } catch (Exception e) {
233                 LOG.warn("Endpoint Augmentation error while processing "
234                         + entry.getKey() + ". Reason: ", e);
235             }
236         }
237         return eb;
238     }
239
240     /**
241      * Construct an L3 endpoint with the appropriate augmentations from the
242      * endpoint input. Each concrete implementation can provides its specifics
243      * earlier.
244      *
245      * @param input
246      *            the input object
247      */
248     private EndpointL3Builder buildEndpointL3(RegisterEndpointInput input) {
249         EndpointL3Builder eb = new EndpointL3Builder(input);
250         for (Entry<String, EpRendererAugmentation> entry : registeredRenderers
251                 .entrySet()) {
252             try {
253                 Augmentation<EndpointL3> augmentation = entry.getValue()
254                         .buildEndpointL3Augmentation(input);
255                 if (augmentation != null) {
256                     @SuppressWarnings("unchecked")
257                     Class<? extends Augmentation<EndpointL3>> augmentationType = (Class<? extends Augmentation<EndpointL3>>) getAugmentationContextType(augmentation);
258                     eb.addAugmentation(augmentationType, augmentation);
259                 }
260             } catch (Exception e) {
261                 LOG.warn("L3 endpoint Augmentation error while processing "
262                         + entry.getKey() + ". Reason: ", e);
263             }
264         }
265         return eb;
266     }
267
268     /**
269      * Construct an L3 endpoint with the appropriate augmentations from the
270      * endpoint input. Each concrete implementation can provides its specifics
271      * earlier.
272      *
273      * @param input
274      *            the input object
275      */
276     private EndpointL3PrefixBuilder buildL3PrefixEndpoint(RegisterL3PrefixEndpointInput input) {
277         EndpointL3PrefixBuilder eb = new EndpointL3PrefixBuilder(input);
278         for (Entry<String, EpRendererAugmentation> entry : registeredRenderers
279                 .entrySet()) {
280             try {
281                 entry.getValue().buildL3PrefixEndpointAugmentation(eb, input);
282             } catch (Exception e) {
283                 LOG.warn("L3 endpoint Augmentation error while processing "
284                         + entry.getKey() + ". Reason: ", e);
285             }
286         }
287         return eb;
288     }
289
290     @Override
291     public Future<RpcResult<Void>>
292             registerEndpoint(RegisterEndpointInput input) {
293         long timestamp = System.currentTimeMillis();
294
295         // TODO: Replicate RPC feedback implemented in L3Prefix register for
296         // unmet requirements.
297         WriteTransaction t = dataProvider.newWriteOnlyTransaction();
298
299         if (input.getL2Context() != null &&
300                 input.getMacAddress() != null) {
301             Endpoint ep = buildEndpoint(input)
302                     .setTimestamp(timestamp)
303                     .build();
304
305             EndpointKey key =
306                     new EndpointKey(ep.getL2Context(), ep.getMacAddress());
307             InstanceIdentifier<Endpoint> iid =
308                     InstanceIdentifier.builder(Endpoints.class)
309                             .child(Endpoint.class, key)
310                             .build();
311             t.put(LogicalDatastoreType.OPERATIONAL, iid, ep, true);
312         }
313         if (input.getL3Address() != null) {
314             for (L3Address l3addr : input.getL3Address()) {
315                 EndpointL3Key key3 = new EndpointL3Key(l3addr.getIpAddress(),
316                         l3addr.getL3Context());
317                 EndpointL3 ep3 = buildEndpointL3(input)
318                         .setIpAddress(key3.getIpAddress())
319                         .setL3Context(key3.getL3Context())
320                         .setTimestamp(timestamp)
321                         .build();
322                 InstanceIdentifier<EndpointL3> iid_l3 =
323                         InstanceIdentifier.builder(Endpoints.class)
324                                 .child(EndpointL3.class, key3)
325                                 .build();
326                 t.put(LogicalDatastoreType.OPERATIONAL, iid_l3, ep3, true);
327             }
328         }
329         ListenableFuture<Void> r = t.submit();
330         return Futures.transform(r, futureTrans, executor);
331     }
332
333     @Override
334     public Future<RpcResult<Void>> registerL3PrefixEndpoint(RegisterL3PrefixEndpointInput input) {
335
336         if (input.getL3Context() == null) {
337             return Futures.immediateFuture(RpcResultBuilder.<Void> failed()
338                     .withError(ErrorType.RPC, "L3 Prefix Endpoint must have L3Context.").build());
339         }
340         if (input.getIpPrefix() == null) {
341             return Futures.immediateFuture(RpcResultBuilder.<Void> failed()
342                     .withError(ErrorType.RPC, "L3 Prefix Endpoint must have ip-prefix.").build());
343         }
344
345         if (input.getTenant() == null) {
346             return Futures.immediateFuture(RpcResultBuilder.<Void> failed()
347                     .withError(ErrorType.RPC, "L3 Prefix Endpoint must have tenant.").build());
348         }
349
350         WriteTransaction t = dataProvider.newWriteOnlyTransaction();
351
352         long timestamp = System.currentTimeMillis();
353
354         // TODO: Convert IPPrefix into it's IPv4/IPv6 canonical form.
355         // See org.apache.commons.net.util.SubnetUtils.SubnetInfo
356
357         EndpointL3PrefixKey epL3PrefixKey = new EndpointL3PrefixKey(input.getIpPrefix(), input.getL3Context());
358
359         EndpointL3Prefix epL3Prefix = buildL3PrefixEndpoint(input).setTimestamp(timestamp).build();
360         InstanceIdentifier<EndpointL3Prefix> iid_l3prefix =
361                 InstanceIdentifier.builder(Endpoints.class)
362                         .child(EndpointL3Prefix.class, epL3PrefixKey)
363                         .build();
364         t.put(LogicalDatastoreType.OPERATIONAL, iid_l3prefix, epL3Prefix);
365
366         ListenableFuture<Void> r = t.submit();
367         return Futures.transform(r, futureTrans, executor);
368     }
369
370     @Override
371     public Future<RpcResult<Void>>
372             unregisterEndpoint(UnregisterEndpointInput input) {
373         WriteTransaction t = dataProvider.newWriteOnlyTransaction();
374         if (input.getL2() != null) {
375             for (L2 l2a : input.getL2()) {
376                 EndpointKey key =
377                         new EndpointKey(l2a.getL2Context(),
378                                 l2a.getMacAddress());
379                 InstanceIdentifier<Endpoint> iid =
380                         InstanceIdentifier.builder(Endpoints.class)
381                                 .child(Endpoint.class, key).build();
382                 t.delete(LogicalDatastoreType.OPERATIONAL, iid);
383             }
384         }
385         if (input.getL3() != null) {
386             for (L3 l3addr : input.getL3()) {
387                 EndpointL3Key key3 =
388                         new EndpointL3Key(l3addr.getIpAddress(),
389                                 l3addr.getL3Context());
390                 InstanceIdentifier<EndpointL3> iid_l3 =
391                         InstanceIdentifier.builder(Endpoints.class)
392                                 .child(EndpointL3.class, key3)
393                                 .build();
394                 t.delete(LogicalDatastoreType.OPERATIONAL, iid_l3);
395             }
396         }
397         // TODO: Implement L3Prefix
398
399         ListenableFuture<Void> r = t.submit();
400         return Futures.transform(r, futureTrans, executor);
401     }
402
403     @Override
404     public Future<RpcResult<Void>>
405             setEndpointGroupConditions(SetEndpointGroupConditionsInput input) {
406         WriteTransaction t = dataProvider.newWriteOnlyTransaction();
407
408         ConditionMappingKey key =
409                 new ConditionMappingKey(input.getEndpointGroup());
410
411         for (EndpointGroupCondition condition : input.getEndpointGroupCondition()) {
412             EndpointGroupConditionKey ckey =
413                     new EndpointGroupConditionKey(condition.getCondition());
414             InstanceIdentifier<EndpointGroupCondition> iid =
415                     InstanceIdentifier.builder(Endpoints.class)
416                             .child(ConditionMapping.class, key)
417                             .child(EndpointGroupCondition.class, ckey)
418                             .build();
419             t.put(LogicalDatastoreType.OPERATIONAL, iid, condition);
420         }
421
422         ListenableFuture<Void> r = t.submit();
423         return Futures.transform(r, futureTrans, executor);
424     }
425
426     @Override
427     public Future<RpcResult<Void>>
428             unsetEndpointGroupConditions(UnsetEndpointGroupConditionsInput input) {
429         WriteTransaction t = dataProvider.newWriteOnlyTransaction();
430
431         ConditionMappingKey key =
432                 new ConditionMappingKey(input.getEndpointGroup());
433
434         for (EndpointGroupCondition condition : input.getEndpointGroupCondition()) {
435             EndpointGroupConditionKey ckey =
436                     new EndpointGroupConditionKey(condition.getCondition());
437             InstanceIdentifier<EndpointGroupCondition> iid =
438                     InstanceIdentifier.builder(Endpoints.class)
439                             .child(ConditionMapping.class, key)
440                             .child(EndpointGroupCondition.class, ckey)
441                             .build();
442
443             t.delete(LogicalDatastoreType.OPERATIONAL, iid);
444         }
445
446         ListenableFuture<Void> r = t.submit();
447         return Futures.transform(r, futureTrans, executor);
448     }
449
450     Function<Void, RpcResult<Void>> futureTrans =
451             new Function<Void, RpcResult<Void>>() {
452                 @Override
453                 public RpcResult<Void> apply(Void input) {
454                     return RpcResultBuilder.<Void> success().build();
455                 }
456             };
457
458 }