Bulk merge of l2gw changes
[netvirt.git] / elanmanager / impl / src / main / java / org / opendaylight / netvirt / elan / l2gw / ha / merge / MergeCommandsAggregator.java
1 /*
2  * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netvirt.elan.l2gw.ha.merge;
9
10 import static org.opendaylight.mdsal.binding.util.Datastore.CONFIGURATION;
11 import static org.opendaylight.mdsal.binding.util.Datastore.OPERATIONAL;
12
13 import com.google.common.cache.Cache;
14 import com.google.common.cache.CacheBuilder;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import java.util.Collection;
19 import java.util.HashMap;
20 import java.util.Map;
21 import java.util.Objects;
22 import java.util.Optional;
23 import java.util.concurrent.TimeUnit;
24 import java.util.function.BiPredicate;
25 import org.opendaylight.genius.utils.SuperTypeUtil;
26 import org.opendaylight.mdsal.binding.api.DataObjectModification;
27 import org.opendaylight.mdsal.binding.util.Datastore;
28 import org.opendaylight.mdsal.binding.util.Datastore.Configuration;
29 import org.opendaylight.mdsal.binding.util.Datastore.Operational;
30 import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunner;
31 import org.opendaylight.mdsal.binding.util.TypedReadWriteTransaction;
32 import org.opendaylight.netvirt.elan.l2gw.ha.BatchedTransaction;
33 import org.opendaylight.netvirt.elan.l2gw.ha.commands.LocalMcastCmd;
34 import org.opendaylight.netvirt.elan.l2gw.ha.commands.LocalUcastCmd;
35 import org.opendaylight.netvirt.elan.l2gw.ha.commands.MergeCommand;
36 import org.opendaylight.netvirt.elan.l2gw.ha.commands.PhysicalLocatorCmd;
37 import org.opendaylight.netvirt.elan.l2gw.ha.commands.RemoteMcastCmd;
38 import org.opendaylight.netvirt.elan.l2gw.ha.commands.RemoteUcastCmd;
39 import org.opendaylight.netvirt.elan.l2gw.ha.commands.TerminationPointCmd;
40 import org.opendaylight.netvirt.elan.l2gw.ha.commands.TunnelCmd;
41 import org.opendaylight.netvirt.elan.l2gw.ha.commands.TunnelIpCmd;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.LocalMcastMacs;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.LocalUcastMacs;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.RemoteMcastMacs;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.RemoteUcastMacs;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.physical._switch.attributes.Tunnels;
47 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
48 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint;
49 import org.opendaylight.yangtools.concepts.Builder;
50 import org.opendaylight.yangtools.yang.binding.DataObject;
51 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 public abstract class MergeCommandsAggregator<BuilderTypeT extends Builder, AugTypeT extends DataObject> {
56
57     private static final Logger LOG = LoggerFactory.getLogger(MergeCommandsAggregator.class);
58
59     protected Map<Class<?>, MergeCommand> commands = new HashMap<>();
60
61     private final Map<Class, Boolean> operSkipCommands = new HashMap<>();
62     private final Map<Class, Boolean> configSkipCommands = new HashMap<>();
63
64     private final BiPredicate<Class<? extends Datastore>, Class> skipCopy =
65         (dsType, cmdType) -> (Configuration.class.equals(dsType) ? configSkipCommands.containsKey(cmdType)
66             : operSkipCommands.containsKey(cmdType));
67
68     private final Cache<InstanceIdentifier, Boolean> deleteInProgressIids = CacheBuilder.newBuilder()
69             .initialCapacity(50000)
70             .expireAfterWrite(600, TimeUnit.SECONDS)
71             .build();
72
73     protected MergeCommandsAggregator() {
74         operSkipCommands.put(RemoteUcastCmd.class, Boolean.TRUE);
75         operSkipCommands.put(RemoteMcastCmd.class, Boolean.TRUE);
76         operSkipCommands.put(TerminationPointCmd.class, Boolean.TRUE);
77         operSkipCommands.put(LocalMcastCmd.class, Boolean.TRUE);
78         operSkipCommands.put(PhysicalLocatorCmd.class, Boolean.TRUE);
79         operSkipCommands.put(TunnelCmd.class, Boolean.TRUE);
80
81         operSkipCommands.put(RemoteMcastMacs.class, Boolean.TRUE);
82         operSkipCommands.put(RemoteUcastMacs.class, Boolean.TRUE);
83         operSkipCommands.put(LocalMcastMacs.class, Boolean.TRUE);
84         operSkipCommands.put(TerminationPoint.class, Boolean.TRUE);
85         operSkipCommands.put(Tunnels.class, Boolean.TRUE);
86
87         configSkipCommands.put(LocalUcastCmd.class, Boolean.TRUE);
88         configSkipCommands.put(LocalUcastMacs.class, Boolean.TRUE);
89     }
90
91     protected void addCommand(MergeCommand mergeCommand) {
92         commands.put(SuperTypeUtil.getTypeParameter(mergeCommand.getClass(), 0), mergeCommand);
93     }
94
95     public void mergeOperationalData(BuilderTypeT builder,
96                                      AugTypeT existingData,
97                                      AugTypeT src,
98                                      InstanceIdentifier<Node> dstPath) {
99         for (MergeCommand cmd : commands.values()) {
100             if (skipCopy.negate().test(OPERATIONAL, cmd.getClass())) {
101                 cmd.mergeOperationalData(builder, existingData, src, dstPath);
102             }
103         }
104     }
105
106     public void mergeConfigData(BuilderTypeT builder,
107                                 AugTypeT src,
108                                 InstanceIdentifier<Node> dstPath) {
109         for (MergeCommand cmd : commands.values()) {
110             if (skipCopy.negate().test(CONFIGURATION, cmd.getClass())) {
111                 cmd.mergeConfigData(builder, src, dstPath);
112             }
113         }
114     }
115
116
117     public void mergeConfigUpdate(InstanceIdentifier<Node> dstPath,
118                                   DataObjectModification mod,
119                                   TypedReadWriteTransaction<Configuration> tx, ManagedNewTransactionRunner txRunner) {
120         mergeUpdate(dstPath, mod, CONFIGURATION, tx, txRunner);
121     }
122
123     public void mergeOpUpdate(InstanceIdentifier<Node> dstPath,
124                               DataObjectModification mod,
125                               TypedReadWriteTransaction<Operational> tx, ManagedNewTransactionRunner txRunner) {
126         mergeUpdate(dstPath, mod, OPERATIONAL, tx, txRunner);
127     }
128
129     @SuppressWarnings("illegalcatch")
130     public <D extends Datastore> void mergeUpdate(InstanceIdentifier<Node> dstPath,
131                             DataObjectModification mod,
132                             Class<D> datastoreType,
133                             TypedReadWriteTransaction<D> transaction,
134                             ManagedNewTransactionRunner txRunner) {
135         BatchedTransaction tx = null;
136         if (mod == null || mod.getModifiedChildren() == null) {
137             return;
138         }
139         if (!(transaction instanceof BatchedTransaction)) {
140             return;
141         }
142         else {
143             tx = (BatchedTransaction)transaction;
144         }
145         final BatchedTransaction transaction1 = tx;
146         String srcNodeId = transaction1.getSrcNodeId().getValue();
147         String dstNodeId = dstPath.firstKeyOf(Node.class).getNodeId().getValue();
148         Collection<DataObjectModification> modifications = mod.getModifiedChildren();
149         modifications.stream()
150                 .filter(modification -> skipCopy.negate().test(datastoreType, modification.getDataType()))
151                 .filter(modification -> commands.get(modification.getDataType()) != null)
152                 .peek(modification -> LOG.debug("Received {} modification {} copy/delete to {}",
153                         datastoreType, modification, dstPath))
154                 .forEach(modification -> {
155                     try {
156                         copyModification(dstPath, datastoreType, transaction1,
157                             srcNodeId, dstNodeId, modification, txRunner);
158                     } catch (Exception e) {
159                         LOG.error("Failed to copy mod from {} to {} {} {} id  {}",
160                             srcNodeId, dstNodeId, modification.getDataType().getSimpleName(),
161                             modification, modification.getIdentifier(), e);
162                     }
163                 });
164     }
165
166     private <D extends Datastore> void copyModification(InstanceIdentifier<Node> dstPath, Class<D> datastoreType,
167                                   BatchedTransaction tx, String srcNodeId, String dstNodeId,
168                                   DataObjectModification modification, ManagedNewTransactionRunner txRunner) {
169         DataObjectModification.ModificationType type = getModificationType(modification);
170         if (type == null) {
171             return;
172         }
173         String src = datastoreType == OPERATIONAL ? "child" : "parent";
174         MergeCommand mergeCommand = commands.get(modification.getDataType());
175         boolean create = false;
176         switch (type) {
177             case WRITE:
178             case SUBTREE_MODIFIED:
179                 DataObject dataAfter = modification.getDataAfter();
180                 if (dataAfter == null) {
181                     return;
182                 }
183                 DataObject before = modification.getDataBefore();
184                 if (Objects.equals(dataAfter, before)) {
185                     LOG.warn("Ha updated skip not modified {}", src);
186                     return;
187                 }
188
189                 create = true;
190                 break;
191             case DELETE:
192                 DataObject dataBefore = modification.getDataBefore();
193                 if (dataBefore == null) {
194                     LOG.warn("Ha updated skip delete {}", src);
195                     return;
196                 }
197                 break;
198             default:
199                 return;
200         }
201         DataObject data = create ? modification.getDataAfter() : modification.getDataBefore();
202         InstanceIdentifier<DataObject> transformedId = mergeCommand.generateId(dstPath, data);
203         if (tx.updateMetric()) {
204             LOG.info("Ha updated processing {}", src);
205         }
206         if (create) {
207             DataObject transformedItem = mergeCommand.transform(dstPath, modification.getDataAfter());
208             tx.put(transformedId, transformedItem);
209             //if tunnel ip command do this for
210             if (mergeCommand.getClass() == TunnelIpCmd.class) {
211                 if (Operational.class.equals(datastoreType)) {
212                     txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION, configTx -> {
213                         configTx.put(transformedId, transformedItem);
214                     });
215
216                 }
217             }
218         } else {
219             if (deleteInProgressIids.getIfPresent(transformedId) == null) {
220                 // TODO uncomment this code
221                 /*if (isLocalMacMoved(mergeCommand, transformedId, tx, srcNodeId, txRunner)) {
222                     return;
223                 }*/
224                 tx.delete(transformedId);
225                 if (mergeCommand.getClass() == TunnelIpCmd.class) {
226                     if (Operational.class.equals(datastoreType)) {
227                         txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION, configTx -> {
228                             tx.delete(transformedId);
229                         });
230                     }
231                 }
232                 deleteInProgressIids.put(transformedId, Boolean.TRUE);
233             } else {
234                 return;
235             }
236         }
237         String created = create ? "created" : "deleted";
238         Futures.addCallback(tx.getFt(transformedId), new FutureCallback<Void>() {
239             @Override
240             public void onSuccess(Void voidResult) {
241                 LOG.info("Ha updated skip not modified {}", mergeCommand.getDescription());
242                 deleteInProgressIids.invalidate(transformedId);
243             }
244
245             @Override
246             public void onFailure(Throwable throwable) {
247                 LOG.error("Ha failed {}", mergeCommand.getDescription());
248                 deleteInProgressIids.invalidate(transformedId);
249             }
250         }, MoreExecutors.directExecutor());
251     }
252
253     /*private boolean isLocalMacMoved(MergeCommand mergeCommand,
254                                     InstanceIdentifier<DataObject> localUcastIid,
255                                     BatchedTransaction tx,
256                                     String parentId, ManagedNewTransactionRunner txRunner) {
257         if (mergeCommand.getClass() != LocalUcastCmd.class) {
258             return false;
259         }
260         final Optional<DataObject> existingMacOptional = Optional.empty();
261             txRunner.callWithNewReadOnlyTransactionAndClose(OPERATIONAL, operTx -> {
262                 Optional<DataObject> temp = operTx.read(localUcastIid).get();
263
264             });
265                 if (!existingMacOptional.isPresent() || existingMacOptional.get() == null) {
266                     return false;
267                 }
268                 LocalUcastMacs existingMac  = (LocalUcastMacs) existingMacOptional.get();
269                 if (existingMac.augmentation(SrcnodeAugmentation.class) != null) {
270                     if (!Objects.equals(existingMac.augmentation(SrcnodeAugmentation.class).getSrcTorNodeid(),
271                         parentId)) {
272                         LOG.error("MergeCommandAggregator mac movement within tor {} {}",
273                             existingMac.augmentation(SrcnodeAugmentation.class).getSrcTorNodeid(), parentId);
274                         return true;
275                     }
276                 }
277
278         return false;
279     }*/
280
281     private DataObjectModification.ModificationType getModificationType(
282             DataObjectModification<? extends DataObject> mod) {
283         try {
284             return mod.getModificationType();
285         } catch (IllegalStateException e) {
286             //not sure why this getter throws this exception, could be some mdsal bug
287             LOG.trace("Failed to get the modification type for mod {}", mod);
288         }
289         return null;
290     }
291
292     boolean isDataUpdated(Optional<DataObject> existingDataOptional, DataObject newData) {
293         return !existingDataOptional.isPresent() || !Objects.equals(existingDataOptional.get(), newData);
294     }
295 }