f3861bad65f8669fe51683c4dc82a5010d8b9659
[netvirt.git] /
1 /*
2  * Copyright (c) 2017 Hewlett Packard Enterprise, Co. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netvirt.federation.plugin;
9
10 import com.google.common.base.Optional;
11 import com.google.common.util.concurrent.UncheckedExecutionException;
12
13 import java.util.ArrayList;
14 import java.util.Collection;
15 import java.util.Collections;
16 import java.util.List;
17 import java.util.Map;
18 import java.util.Map.Entry;
19 import java.util.concurrent.CompletableFuture;
20
21 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
22 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType;
23 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
24 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
25 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
26 import org.opendaylight.federation.plugin.spi.IFederationPluginEgress;
27 import org.opendaylight.federation.service.api.IFederationProducerMgr;
28 import org.opendaylight.federation.service.api.federationutil.FederationUtils;
29 import org.opendaylight.federation.service.common.api.EntityFederationMessage;
30 import org.opendaylight.federation.service.common.api.ListenerData;
31 import org.opendaylight.netvirt.federation.plugin.filters.FilterResult;
32 import org.opendaylight.yangtools.yang.binding.DataObject;
33 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
34 import org.slf4j.Logger;
35
36
37 public class FederationPluginEgress implements IFederationPluginEgress {
38
39     private final Logger logger;
40     private final IFederationProducerMgr producerMgr;
41     private final String queueName;
42     private final String contextId;
43     private final FederatedMappings federatedMappings;
44     private final PendingModificationCache<DataTreeModification<? extends DataObject>> pendingModifications = //
45             new PendingModificationCache<>();
46
47     private volatile boolean aborted = false;
48
49     static {
50         FederationPluginUtils.initYangModules();
51     }
52
53     public FederationPluginEgress(final IFederationProducerMgr producerMgr,
54             List<FederatedNetworkPair> federatedNetworkPairs, String queueName, String contextId) {
55         this.producerMgr = producerMgr;
56         this.queueName = queueName;
57         this.contextId = contextId;
58         logger = FederationUtils.createLogger(queueName, FederationPluginEgress.class);
59         federatedMappings = new FederatedMappings(federatedNetworkPairs);
60     }
61
62     @Override
63     public synchronized void steadyData(String listenerKey,
64             Collection<DataTreeModification<? extends DataObject>> dataTreeModifications) {
65         if (!aborted) {
66             FederationPluginCounters.egress_steady_data.inc();
67             processDataTreeModifications(listenerKey, dataTreeModifications, false);
68         } else {
69             FederationPluginCounters.egress_steady_data_aborted.inc();
70         }
71     }
72
73     @SuppressWarnings({ "rawtypes", "unchecked" })
74     @Override
75     public synchronized void fullSyncData(String listenerKey, Optional existingData) {
76         if (aborted) {
77             FederationPluginCounters.egress_full_sync_aborted.inc();
78             return;
79         }
80
81         FederationPluginCounters.egress_full_sync.inc();
82         Collection dataTreeModifications = createModifications(listenerKey, existingData);
83         processDataTreeModifications(listenerKey, dataTreeModifications, true);
84     }
85
86     @Override
87     public List<ListenerData> getListenersData() {
88         List<ListenerData> listenersData = new ArrayList<>();
89         for (String listenerKey : FederationPluginUtils.getOrderedListenerKeys()) {
90             LogicalDatastoreType datastoreType = FederationPluginUtils.getListenerDatastoreType(listenerKey);
91             if (datastoreType == null) {
92                 logger.error("Failed to get datastore type for listener {}. Ignoring listener key", listenerKey);
93                 continue;
94             }
95
96             InstanceIdentifier<?> instanceIdentifierForListener = FederationPluginUtils
97                     .getInstanceIdentifier(listenerKey);
98             if (instanceIdentifierForListener == null) {
99                 logger.error("Failed to get instance identifier of listener for listener key {}. Ignoring listener key",
100                         listenerKey);
101                 continue;
102             }
103
104             InstanceIdentifier<?> instanceIdentifierForExistingData = FederationPluginUtils
105                     .getParentInstanceIdentifier(listenerKey);
106             if (instanceIdentifierForExistingData == null) {
107                 logger.error(
108                         "Failed to get instance identifier of existing data for listener key {}. Ignoring listener key",
109                         listenerKey);
110                 continue;
111             }
112
113             ListenerData listenerData = new ListenerData(listenerKey,
114                     new DataTreeIdentifier<>(datastoreType, instanceIdentifierForListener),
115                     new DataTreeIdentifier<>(datastoreType, instanceIdentifierForExistingData));
116             listenersData.add(listenerData);
117         }
118
119         logger.debug("Listener keys {}", listenersData);
120         return listenersData;
121     }
122
123     @Override
124     public void cleanup() {
125         pendingModifications.cleanup();
126     }
127
128     private void processDataTreeModifications(String listenerKey,
129             Collection<DataTreeModification<? extends DataObject>> dataTreeModifications, boolean isFullSync) {
130         if (dataTreeModifications == null) {
131             return;
132         }
133
134         for (DataTreeModification<? extends DataObject> dataTreeModification : dataTreeModifications) {
135             if (isSpuriousModification(dataTreeModification)) {
136                 continue;
137             }
138             processDataTreeModification(listenerKey, dataTreeModification, isFullSync);
139         }
140     }
141
142     private boolean isSpuriousModification(DataTreeModification<? extends DataObject> dataTreeModification) {
143         if (dataTreeModification == null) {
144             return true;
145         }
146         DataObjectModification<? extends DataObject> rootNode = dataTreeModification.getRootNode();
147         if (rootNode.getDataBefore() != null && rootNode.getDataAfter() != null
148                 && rootNode.getDataBefore().equals(rootNode.getDataAfter())) {
149             return true;
150         }
151         return false;
152     }
153
154     private <T extends DataObject> void processDataTreeModification(String listenerKey,
155             DataTreeModification<T> dataTreeModification, boolean publishInTx) {
156         T dataObject = FederationPluginUtils.getDataObjectFromModification(dataTreeModification);
157         if (dataObject == null) {
158             logger.warn("Failed to get DataObject from {}", dataObject);
159             return;
160         }
161
162         if (!applyFilter(listenerKey, dataObject, dataTreeModification)) {
163             logger.trace("listener {} filtered out", listenerKey);
164             return;
165         }
166
167         // process queued modifications associated with this modification
168         processPendingDataTreeModifications(listenerKey, dataObject, publishInTx);
169         // queue deleted modification for future use if required
170         if (ModificationType.DELETE.equals(dataTreeModification.getRootNode().getModificationType())
171                 && PendingModificationCache.isLiberatorKey(listenerKey)) {
172             addPendingModification(listenerKey, dataObject, dataTreeModification);
173         }
174         // publish the modification to the federation
175         publishDataTreeModification(listenerKey, dataObject, dataTreeModification, publishInTx);
176     }
177
178     private <T extends DataObject> void processPendingDataTreeModifications(String listenerKey, T dataObject,
179             boolean publishInTx) {
180         Map<String, Collection<DataTreeModification<? extends DataObject>>>
181             associatedModifications = removePendingModifications(listenerKey, dataObject);
182         if (associatedModifications != null) {
183             for (Entry<String, Collection<DataTreeModification<? extends DataObject>>> entry : associatedModifications
184                     .entrySet()) {
185                 for (DataTreeModification<? extends DataObject> modification : entry.getValue()) {
186                     processPendingDataTreeModification(entry.getKey(), modification, publishInTx);
187                 }
188             }
189         }
190     }
191
192     private <T extends DataObject> void processPendingDataTreeModification(String listenerKey,
193             DataTreeModification<T> dataTreeModification, boolean publishInTx) {
194         T dataObject = FederationPluginUtils.getDataObjectFromModification(dataTreeModification);
195         if (dataObject == null) {
196             logger.warn("Failed to get DataObject from {}", dataObject);
197             return;
198         }
199
200         FederationPluginCounters.egress_process_pending_modification.inc();
201         publishDataTreeModification(listenerKey, dataObject, dataTreeModification, publishInTx);
202     }
203
204     private <T extends DataObject, S extends DataObject> void publishDataTreeModification(String listenerKey,
205             S dataObject, DataTreeModification<S> dataTreeModification, boolean publishInTx) {
206         T transformedObject = FederationPluginUtils.applyEgressTransformation(listenerKey, dataObject,
207                 federatedMappings, pendingModifications);
208         if (transformedObject == null) {
209             FederationPluginCounters.egress_transformation_failed.inc();
210             logger.error("Failed to transform {} for listener {}", dataObject, listenerKey);
211             return;
212         }
213
214         EntityFederationMessage<T> msg = createEntityFederationMsgFromDataObject(listenerKey, transformedObject,
215                 dataTreeModification);
216         FederationPluginCounters.egress_publish_modification.inc();
217         logger.trace("Publishing {} for listener {}", transformedObject, listenerKey);
218         producerMgr.publishMessage(msg, queueName, contextId);
219     }
220
221     private <T extends DataObject> boolean applyFilter(String listenerKey, T dataObject,
222             DataTreeModification<T> dataTreeModification) {
223         FilterResult filterResult = FederationPluginUtils.applyEgressFilter(listenerKey, dataObject, federatedMappings,
224                 pendingModifications, dataTreeModification);
225         if (filterResult == null) {
226             logger.warn("Failed to get FilterResult for {} {}", listenerKey, dataObject);
227             return false;
228         }
229
230         logger.trace("{} filter result {}", listenerKey, filterResult);
231         switch (filterResult) {
232             case DENY:
233                 FederationPluginCounters.egress_filter_result_deny.inc();
234                 return false;
235             case ACCEPT:
236                 FederationPluginCounters.egress_filter_result_accept.inc();
237                 return true;
238             case QUEUE:
239                 FederationPluginCounters.egress_filter_result_queue.inc();
240                 addPendingModification(listenerKey, dataObject, dataTreeModification);
241                 return false;
242             default:
243                 logger.error("Didn't find a match for the filter result {}", filterResult.toString());
244                 return false;
245         }
246     }
247
248     private <T extends DataObject> void addPendingModification(String listenerKey, T dataObject,
249             DataTreeModification<? extends DataObject> dataTreeModification) {
250         logger.trace("Add pending modification {} listener {}", dataObject, listenerKey);
251         pendingModifications.add(dataObject, listenerKey, dataTreeModification);
252     }
253
254     private <T extends DataObject> Map<String, Collection<DataTreeModification<? extends DataObject>>>
255         removePendingModifications(String listenerKey, T dataObject) {
256         if (!PendingModificationCache.isLiberatorKey(listenerKey)) {
257             return null;
258         }
259
260         logger.trace("Remove pending modifications for listener {}", listenerKey);
261         return pendingModifications.remove(dataObject);
262     }
263
264     @SuppressWarnings({ "unchecked" })
265     private <T extends DataObject, S extends DataObject> EntityFederationMessage<T>
266         createEntityFederationMsgFromDataObject(String listenerKey, T dataObject,
267                 DataTreeModification<S> dataTreeModification) {
268         DataObjectModification<S> dataObjectModification = dataTreeModification.getRootNode();
269         ModificationType modificationType = dataObjectModification.getModificationType();
270         InstanceIdentifier<T> instanceIdentifier = (InstanceIdentifier<T>) FederationPluginUtils
271                 .getSubtreeInstanceIdentifier(listenerKey);
272         LogicalDatastoreType datastoreType = FederationPluginUtils.getListenerDatastoreType(listenerKey);
273         EntityFederationMessage<T> msg = createMsgWithRetriesMechanism(dataObject, modificationType, instanceIdentifier,
274                 datastoreType, 2);
275         return msg;
276     }
277
278     /**
279      * This attempts to workaround
280      * https://bugs.opendaylight.org/show_bug.cgi?id=7420.
281      */
282     @SuppressWarnings({ "rawtypes", "unchecked", "checkstyle:emptyblock" })
283     private <T extends DataObject, S extends DataObject> EntityFederationMessage<T> createMsgWithRetriesMechanism(
284             T dataObject, ModificationType modificationType, InstanceIdentifier<T> instanceIdentifier,
285             LogicalDatastoreType datastoreType, int remainingRetries) {
286         try {
287             EntityFederationMessage msg = new EntityFederationMessage(datastoreType.toString(),
288                     modificationType.toString(), null, queueName, instanceIdentifier, dataObject);
289             return msg;
290         } catch (UncheckedExecutionException t) {
291             if (remainingRetries > 0) {
292
293                 logger.warn("Create EntityFederationMessage failed because of frozen class. retrying...", t);
294                 try {
295                     Thread.sleep(1000);
296                 } catch (InterruptedException e) {
297                 }
298                 createMsgWithRetriesMechanism(dataObject, modificationType, instanceIdentifier, datastoreType,
299                         --remainingRetries);
300             }
301         }
302
303         logger.error("Failed to create EntityFederationMessage due to frozen class. aborting creation");
304         return null;
305     }
306
307     @Override
308     public synchronized CompletableFuture<Void> abort() {
309         aborted = true;
310         return CompletableFuture.completedFuture(null);
311     }
312
313     @SuppressWarnings("rawtypes")
314     private <T extends DataObject> Collection<DataTreeModification<T>> createModifications(String listenerKey,
315             Optional existingData) {
316         if (existingData.isPresent()) {
317             return FederationPluginUtils.createModifications(listenerKey, (DataObject) existingData.get());
318         }
319
320         FederationPluginCounters.egress_no_existing_data.inc();
321         return Collections.emptyList();
322     }
323
324 }