2 * Copyright (c) 2017 Hewlett Packard Enterprise, Co. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netvirt.federation.plugin;
10 import com.google.common.base.Optional;
11 import com.google.common.util.concurrent.UncheckedExecutionException;
13 import java.util.ArrayList;
14 import java.util.Collection;
15 import java.util.Collections;
16 import java.util.List;
18 import java.util.Map.Entry;
19 import java.util.concurrent.CompletableFuture;
21 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
22 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType;
23 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
24 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
25 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
26 import org.opendaylight.federation.plugin.spi.IFederationPluginEgress;
27 import org.opendaylight.federation.service.api.IFederationProducerMgr;
28 import org.opendaylight.federation.service.api.federationutil.FederationUtils;
29 import org.opendaylight.federation.service.common.api.EntityFederationMessage;
30 import org.opendaylight.federation.service.common.api.ListenerData;
31 import org.opendaylight.netvirt.federation.plugin.filters.FilterResult;
32 import org.opendaylight.yangtools.yang.binding.DataObject;
33 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
34 import org.slf4j.Logger;
37 public class FederationPluginEgress implements IFederationPluginEgress {
39 private final Logger logger;
40 private final IFederationProducerMgr producerMgr;
41 private final String queueName;
42 private final String contextId;
43 private final FederatedMappings federatedMappings;
44 private final PendingModificationCache<DataTreeModification<? extends DataObject>> pendingModifications = //
45 new PendingModificationCache<>();
47 private volatile boolean aborted = false;
50 FederationPluginUtils.initYangModules();
53 public FederationPluginEgress(final IFederationProducerMgr producerMgr,
54 List<FederatedNetworkPair> federatedNetworkPairs, String queueName, String contextId) {
55 this.producerMgr = producerMgr;
56 this.queueName = queueName;
57 this.contextId = contextId;
58 logger = FederationUtils.createLogger(queueName, FederationPluginEgress.class);
59 federatedMappings = new FederatedMappings(federatedNetworkPairs);
63 public synchronized void steadyData(String listenerKey,
64 Collection<DataTreeModification<? extends DataObject>> dataTreeModifications) {
66 FederationPluginCounters.egress_steady_data.inc();
67 processDataTreeModifications(listenerKey, dataTreeModifications, false);
69 FederationPluginCounters.egress_steady_data_aborted.inc();
73 @SuppressWarnings({ "rawtypes", "unchecked" })
75 public synchronized void fullSyncData(String listenerKey, Optional existingData) {
77 FederationPluginCounters.egress_full_sync_aborted.inc();
81 FederationPluginCounters.egress_full_sync.inc();
82 Collection dataTreeModifications = createModifications(listenerKey, existingData);
83 processDataTreeModifications(listenerKey, dataTreeModifications, true);
87 public List<ListenerData> getListenersData() {
88 List<ListenerData> listenersData = new ArrayList<>();
89 for (String listenerKey : FederationPluginUtils.getOrderedListenerKeys()) {
90 LogicalDatastoreType datastoreType = FederationPluginUtils.getListenerDatastoreType(listenerKey);
91 if (datastoreType == null) {
92 logger.error("Failed to get datastore type for listener {}. Ignoring listener key", listenerKey);
96 InstanceIdentifier<?> instanceIdentifierForListener = FederationPluginUtils
97 .getInstanceIdentifier(listenerKey);
98 if (instanceIdentifierForListener == null) {
99 logger.error("Failed to get instance identifier of listener for listener key {}. Ignoring listener key",
104 InstanceIdentifier<?> instanceIdentifierForExistingData = FederationPluginUtils
105 .getParentInstanceIdentifier(listenerKey);
106 if (instanceIdentifierForExistingData == null) {
108 "Failed to get instance identifier of existing data for listener key {}. Ignoring listener key",
113 ListenerData listenerData = new ListenerData(listenerKey,
114 new DataTreeIdentifier<>(datastoreType, instanceIdentifierForListener),
115 new DataTreeIdentifier<>(datastoreType, instanceIdentifierForExistingData));
116 listenersData.add(listenerData);
119 logger.debug("Listener keys {}", listenersData);
120 return listenersData;
124 public void cleanup() {
125 pendingModifications.cleanup();
128 private void processDataTreeModifications(String listenerKey,
129 Collection<DataTreeModification<? extends DataObject>> dataTreeModifications, boolean isFullSync) {
130 if (dataTreeModifications == null) {
134 for (DataTreeModification<? extends DataObject> dataTreeModification : dataTreeModifications) {
135 if (isSpuriousModification(dataTreeModification)) {
138 processDataTreeModification(listenerKey, dataTreeModification, isFullSync);
142 private boolean isSpuriousModification(DataTreeModification<? extends DataObject> dataTreeModification) {
143 if (dataTreeModification == null) {
146 DataObjectModification<? extends DataObject> rootNode = dataTreeModification.getRootNode();
147 if (rootNode.getDataBefore() != null && rootNode.getDataAfter() != null
148 && rootNode.getDataBefore().equals(rootNode.getDataAfter())) {
154 private <T extends DataObject> void processDataTreeModification(String listenerKey,
155 DataTreeModification<T> dataTreeModification, boolean publishInTx) {
156 T dataObject = FederationPluginUtils.getDataObjectFromModification(dataTreeModification);
157 if (dataObject == null) {
158 logger.warn("Failed to get DataObject from {}", dataObject);
162 if (!applyFilter(listenerKey, dataObject, dataTreeModification)) {
163 logger.trace("listener {} filtered out", listenerKey);
167 // process queued modifications associated with this modification
168 processPendingDataTreeModifications(listenerKey, dataObject, publishInTx);
169 // queue deleted modification for future use if required
170 if (ModificationType.DELETE.equals(dataTreeModification.getRootNode().getModificationType())
171 && PendingModificationCache.isLiberatorKey(listenerKey)) {
172 addPendingModification(listenerKey, dataObject, dataTreeModification);
174 // publish the modification to the federation
175 publishDataTreeModification(listenerKey, dataObject, dataTreeModification, publishInTx);
178 private <T extends DataObject> void processPendingDataTreeModifications(String listenerKey, T dataObject,
179 boolean publishInTx) {
180 Map<String, Collection<DataTreeModification<? extends DataObject>>>
181 associatedModifications = removePendingModifications(listenerKey, dataObject);
182 if (associatedModifications != null) {
183 for (Entry<String, Collection<DataTreeModification<? extends DataObject>>> entry : associatedModifications
185 for (DataTreeModification<? extends DataObject> modification : entry.getValue()) {
186 processPendingDataTreeModification(entry.getKey(), modification, publishInTx);
192 private <T extends DataObject> void processPendingDataTreeModification(String listenerKey,
193 DataTreeModification<T> dataTreeModification, boolean publishInTx) {
194 T dataObject = FederationPluginUtils.getDataObjectFromModification(dataTreeModification);
195 if (dataObject == null) {
196 logger.warn("Failed to get DataObject from {}", dataObject);
200 FederationPluginCounters.egress_process_pending_modification.inc();
201 publishDataTreeModification(listenerKey, dataObject, dataTreeModification, publishInTx);
204 private <T extends DataObject, S extends DataObject> void publishDataTreeModification(String listenerKey,
205 S dataObject, DataTreeModification<S> dataTreeModification, boolean publishInTx) {
206 T transformedObject = FederationPluginUtils.applyEgressTransformation(listenerKey, dataObject,
207 federatedMappings, pendingModifications);
208 if (transformedObject == null) {
209 FederationPluginCounters.egress_transformation_failed.inc();
210 logger.error("Failed to transform {} for listener {}", dataObject, listenerKey);
214 EntityFederationMessage<T> msg = createEntityFederationMsgFromDataObject(listenerKey, transformedObject,
215 dataTreeModification);
216 FederationPluginCounters.egress_publish_modification.inc();
217 logger.trace("Publishing {} for listener {}", transformedObject, listenerKey);
218 producerMgr.publishMessage(msg, queueName, contextId);
221 private <T extends DataObject> boolean applyFilter(String listenerKey, T dataObject,
222 DataTreeModification<T> dataTreeModification) {
223 FilterResult filterResult = FederationPluginUtils.applyEgressFilter(listenerKey, dataObject, federatedMappings,
224 pendingModifications, dataTreeModification);
225 if (filterResult == null) {
226 logger.warn("Failed to get FilterResult for {} {}", listenerKey, dataObject);
230 logger.trace("{} filter result {}", listenerKey, filterResult);
231 switch (filterResult) {
233 FederationPluginCounters.egress_filter_result_deny.inc();
236 FederationPluginCounters.egress_filter_result_accept.inc();
239 FederationPluginCounters.egress_filter_result_queue.inc();
240 addPendingModification(listenerKey, dataObject, dataTreeModification);
243 logger.error("Didn't find a match for the filter result {}", filterResult.toString());
248 private <T extends DataObject> void addPendingModification(String listenerKey, T dataObject,
249 DataTreeModification<? extends DataObject> dataTreeModification) {
250 logger.trace("Add pending modification {} listener {}", dataObject, listenerKey);
251 pendingModifications.add(dataObject, listenerKey, dataTreeModification);
254 private <T extends DataObject> Map<String, Collection<DataTreeModification<? extends DataObject>>>
255 removePendingModifications(String listenerKey, T dataObject) {
256 if (!PendingModificationCache.isLiberatorKey(listenerKey)) {
260 logger.trace("Remove pending modifications for listener {}", listenerKey);
261 return pendingModifications.remove(dataObject);
264 @SuppressWarnings({ "unchecked" })
265 private <T extends DataObject, S extends DataObject> EntityFederationMessage<T>
266 createEntityFederationMsgFromDataObject(String listenerKey, T dataObject,
267 DataTreeModification<S> dataTreeModification) {
268 DataObjectModification<S> dataObjectModification = dataTreeModification.getRootNode();
269 ModificationType modificationType = dataObjectModification.getModificationType();
270 InstanceIdentifier<T> instanceIdentifier = (InstanceIdentifier<T>) FederationPluginUtils
271 .getSubtreeInstanceIdentifier(listenerKey);
272 LogicalDatastoreType datastoreType = FederationPluginUtils.getListenerDatastoreType(listenerKey);
273 EntityFederationMessage<T> msg = createMsgWithRetriesMechanism(dataObject, modificationType, instanceIdentifier,
279 * This attempts to workaround
280 * https://bugs.opendaylight.org/show_bug.cgi?id=7420.
282 @SuppressWarnings({ "rawtypes", "unchecked", "checkstyle:emptyblock" })
283 private <T extends DataObject, S extends DataObject> EntityFederationMessage<T> createMsgWithRetriesMechanism(
284 T dataObject, ModificationType modificationType, InstanceIdentifier<T> instanceIdentifier,
285 LogicalDatastoreType datastoreType, int remainingRetries) {
287 EntityFederationMessage msg = new EntityFederationMessage(datastoreType.toString(),
288 modificationType.toString(), null, queueName, instanceIdentifier, dataObject);
290 } catch (UncheckedExecutionException t) {
291 if (remainingRetries > 0) {
293 logger.warn("Create EntityFederationMessage failed because of frozen class. retrying...", t);
296 } catch (InterruptedException e) {
298 createMsgWithRetriesMechanism(dataObject, modificationType, instanceIdentifier, datastoreType,
303 logger.error("Failed to create EntityFederationMessage due to frozen class. aborting creation");
308 public synchronized CompletableFuture<Void> abort() {
310 return CompletableFuture.completedFuture(null);
313 @SuppressWarnings("rawtypes")
314 private <T extends DataObject> Collection<DataTreeModification<T>> createModifications(String listenerKey,
315 Optional existingData) {
316 if (existingData.isPresent()) {
317 return FederationPluginUtils.createModifications(listenerKey, (DataObject) existingData.get());
320 FederationPluginCounters.egress_no_existing_data.inc();
321 return Collections.emptyList();