2 * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netvirt.elan.l2gw.ha.merge;
10 import static org.opendaylight.mdsal.binding.util.Datastore.CONFIGURATION;
11 import static org.opendaylight.mdsal.binding.util.Datastore.OPERATIONAL;
13 import com.google.common.cache.Cache;
14 import com.google.common.cache.CacheBuilder;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import java.util.Collection;
19 import java.util.HashMap;
21 import java.util.Objects;
22 import java.util.Optional;
23 import java.util.concurrent.TimeUnit;
24 import java.util.function.BiPredicate;
25 import org.opendaylight.genius.utils.SuperTypeUtil;
26 import org.opendaylight.mdsal.binding.api.DataObjectModification;
27 import org.opendaylight.mdsal.binding.util.Datastore;
28 import org.opendaylight.mdsal.binding.util.Datastore.Configuration;
29 import org.opendaylight.mdsal.binding.util.Datastore.Operational;
30 import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunner;
31 import org.opendaylight.mdsal.binding.util.TypedReadWriteTransaction;
32 import org.opendaylight.netvirt.elan.l2gw.ha.BatchedTransaction;
33 import org.opendaylight.netvirt.elan.l2gw.ha.commands.LocalMcastCmd;
34 import org.opendaylight.netvirt.elan.l2gw.ha.commands.LocalUcastCmd;
35 import org.opendaylight.netvirt.elan.l2gw.ha.commands.MergeCommand;
36 import org.opendaylight.netvirt.elan.l2gw.ha.commands.PhysicalLocatorCmd;
37 import org.opendaylight.netvirt.elan.l2gw.ha.commands.RemoteMcastCmd;
38 import org.opendaylight.netvirt.elan.l2gw.ha.commands.RemoteUcastCmd;
39 import org.opendaylight.netvirt.elan.l2gw.ha.commands.TerminationPointCmd;
40 import org.opendaylight.netvirt.elan.l2gw.ha.commands.TunnelCmd;
41 import org.opendaylight.netvirt.elan.l2gw.ha.commands.TunnelIpCmd;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.LocalMcastMacs;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.LocalUcastMacs;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.RemoteMcastMacs;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.RemoteUcastMacs;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.physical._switch.attributes.Tunnels;
47 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
48 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint;
49 import org.opendaylight.yangtools.concepts.Builder;
50 import org.opendaylight.yangtools.yang.binding.DataObject;
51 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
55 public abstract class MergeCommandsAggregator<BuilderTypeT extends Builder, AugTypeT extends DataObject> {
57 private static final Logger LOG = LoggerFactory.getLogger(MergeCommandsAggregator.class);
59 protected Map<Class<?>, MergeCommand> commands = new HashMap<>();
61 private final Map<Class, Boolean> operSkipCommands = new HashMap<>();
62 private final Map<Class, Boolean> configSkipCommands = new HashMap<>();
64 private final BiPredicate<Class<? extends Datastore>, Class> skipCopy =
65 (dsType, cmdType) -> (Configuration.class.equals(dsType) ? configSkipCommands.containsKey(cmdType)
66 : operSkipCommands.containsKey(cmdType));
68 private final Cache<InstanceIdentifier, Boolean> deleteInProgressIids = CacheBuilder.newBuilder()
69 .initialCapacity(50000)
70 .expireAfterWrite(600, TimeUnit.SECONDS)
73 protected MergeCommandsAggregator() {
74 operSkipCommands.put(RemoteUcastCmd.class, Boolean.TRUE);
75 operSkipCommands.put(RemoteMcastCmd.class, Boolean.TRUE);
76 operSkipCommands.put(TerminationPointCmd.class, Boolean.TRUE);
77 operSkipCommands.put(LocalMcastCmd.class, Boolean.TRUE);
78 operSkipCommands.put(PhysicalLocatorCmd.class, Boolean.TRUE);
79 operSkipCommands.put(TunnelCmd.class, Boolean.TRUE);
81 operSkipCommands.put(RemoteMcastMacs.class, Boolean.TRUE);
82 operSkipCommands.put(RemoteUcastMacs.class, Boolean.TRUE);
83 operSkipCommands.put(LocalMcastMacs.class, Boolean.TRUE);
84 operSkipCommands.put(TerminationPoint.class, Boolean.TRUE);
85 operSkipCommands.put(Tunnels.class, Boolean.TRUE);
87 configSkipCommands.put(LocalUcastCmd.class, Boolean.TRUE);
88 configSkipCommands.put(LocalUcastMacs.class, Boolean.TRUE);
91 protected void addCommand(MergeCommand mergeCommand) {
92 commands.put(SuperTypeUtil.getTypeParameter(mergeCommand.getClass(), 0), mergeCommand);
95 public void mergeOperationalData(BuilderTypeT builder,
96 AugTypeT existingData,
98 InstanceIdentifier<Node> dstPath) {
99 for (MergeCommand cmd : commands.values()) {
100 if (skipCopy.negate().test(OPERATIONAL, cmd.getClass())) {
101 cmd.mergeOperationalData(builder, existingData, src, dstPath);
106 public void mergeConfigData(BuilderTypeT builder,
108 InstanceIdentifier<Node> dstPath) {
109 for (MergeCommand cmd : commands.values()) {
110 if (skipCopy.negate().test(CONFIGURATION, cmd.getClass())) {
111 cmd.mergeConfigData(builder, src, dstPath);
117 public void mergeConfigUpdate(InstanceIdentifier<Node> dstPath,
118 DataObjectModification mod,
119 TypedReadWriteTransaction<Configuration> tx, ManagedNewTransactionRunner txRunner) {
120 mergeUpdate(dstPath, mod, CONFIGURATION, tx, txRunner);
123 public void mergeOpUpdate(InstanceIdentifier<Node> dstPath,
124 DataObjectModification mod,
125 TypedReadWriteTransaction<Operational> tx, ManagedNewTransactionRunner txRunner) {
126 mergeUpdate(dstPath, mod, OPERATIONAL, tx, txRunner);
129 @SuppressWarnings("illegalcatch")
130 public <D extends Datastore> void mergeUpdate(InstanceIdentifier<Node> dstPath,
131 DataObjectModification mod,
132 Class<D> datastoreType,
133 TypedReadWriteTransaction<D> transaction,
134 ManagedNewTransactionRunner txRunner) {
135 BatchedTransaction tx = null;
136 if (mod == null || mod.getModifiedChildren() == null) {
139 if (!(transaction instanceof BatchedTransaction)) {
143 tx = (BatchedTransaction)transaction;
145 final BatchedTransaction transaction1 = tx;
146 String srcNodeId = transaction1.getSrcNodeId().getValue();
147 String dstNodeId = dstPath.firstKeyOf(Node.class).getNodeId().getValue();
148 Collection<DataObjectModification> modifications = mod.getModifiedChildren();
149 modifications.stream()
150 .filter(modification -> skipCopy.negate().test(datastoreType, modification.getDataType()))
151 .filter(modification -> commands.get(modification.getDataType()) != null)
152 .peek(modification -> LOG.debug("Received {} modification {} copy/delete to {}",
153 datastoreType, modification, dstPath))
154 .forEach(modification -> {
156 copyModification(dstPath, datastoreType, transaction1,
157 srcNodeId, dstNodeId, modification, txRunner);
158 } catch (Exception e) {
159 LOG.error("Failed to copy mod from {} to {} {} {} id {}",
160 srcNodeId, dstNodeId, modification.getDataType().getSimpleName(),
161 modification, modification.getIdentifier(), e);
166 private <D extends Datastore> void copyModification(InstanceIdentifier<Node> dstPath, Class<D> datastoreType,
167 BatchedTransaction tx, String srcNodeId, String dstNodeId,
168 DataObjectModification modification, ManagedNewTransactionRunner txRunner) {
169 DataObjectModification.ModificationType type = getModificationType(modification);
173 String src = datastoreType == OPERATIONAL ? "child" : "parent";
174 MergeCommand mergeCommand = commands.get(modification.getDataType());
175 boolean create = false;
178 case SUBTREE_MODIFIED:
179 DataObject dataAfter = modification.getDataAfter();
180 if (dataAfter == null) {
183 DataObject before = modification.getDataBefore();
184 if (Objects.equals(dataAfter, before)) {
185 LOG.warn("Ha updated skip not modified {}", src);
192 DataObject dataBefore = modification.getDataBefore();
193 if (dataBefore == null) {
194 LOG.warn("Ha updated skip delete {}", src);
201 DataObject data = create ? modification.getDataAfter() : modification.getDataBefore();
202 InstanceIdentifier<DataObject> transformedId = mergeCommand.generateId(dstPath, data);
203 if (tx.updateMetric()) {
204 LOG.info("Ha updated processing {}", src);
207 DataObject transformedItem = mergeCommand.transform(dstPath, modification.getDataAfter());
208 tx.put(transformedId, transformedItem);
209 //if tunnel ip command do this for
210 if (mergeCommand.getClass() == TunnelIpCmd.class) {
211 if (Operational.class.equals(datastoreType)) {
212 txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION, configTx -> {
213 configTx.put(transformedId, transformedItem);
219 if (deleteInProgressIids.getIfPresent(transformedId) == null) {
220 // TODO uncomment this code
221 /*if (isLocalMacMoved(mergeCommand, transformedId, tx, srcNodeId, txRunner)) {
224 tx.delete(transformedId);
225 if (mergeCommand.getClass() == TunnelIpCmd.class) {
226 if (Operational.class.equals(datastoreType)) {
227 txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION, configTx -> {
228 tx.delete(transformedId);
232 deleteInProgressIids.put(transformedId, Boolean.TRUE);
237 String created = create ? "created" : "deleted";
238 Futures.addCallback(tx.getFt(transformedId), new FutureCallback<Void>() {
240 public void onSuccess(Void voidResult) {
241 LOG.info("Ha updated skip not modified {}", mergeCommand.getDescription());
242 deleteInProgressIids.invalidate(transformedId);
246 public void onFailure(Throwable throwable) {
247 LOG.error("Ha failed {}", mergeCommand.getDescription());
248 deleteInProgressIids.invalidate(transformedId);
250 }, MoreExecutors.directExecutor());
253 /*private boolean isLocalMacMoved(MergeCommand mergeCommand,
254 InstanceIdentifier<DataObject> localUcastIid,
255 BatchedTransaction tx,
256 String parentId, ManagedNewTransactionRunner txRunner) {
257 if (mergeCommand.getClass() != LocalUcastCmd.class) {
260 final Optional<DataObject> existingMacOptional = Optional.empty();
261 txRunner.callWithNewReadOnlyTransactionAndClose(OPERATIONAL, operTx -> {
262 Optional<DataObject> temp = operTx.read(localUcastIid).get();
265 if (!existingMacOptional.isPresent() || existingMacOptional.get() == null) {
268 LocalUcastMacs existingMac = (LocalUcastMacs) existingMacOptional.get();
269 if (existingMac.augmentation(SrcnodeAugmentation.class) != null) {
270 if (!Objects.equals(existingMac.augmentation(SrcnodeAugmentation.class).getSrcTorNodeid(),
272 LOG.error("MergeCommandAggregator mac movement within tor {} {}",
273 existingMac.augmentation(SrcnodeAugmentation.class).getSrcTorNodeid(), parentId);
281 private DataObjectModification.ModificationType getModificationType(
282 DataObjectModification<? extends DataObject> mod) {
284 return mod.getModificationType();
285 } catch (IllegalStateException e) {
286 //not sure why this getter throws this exception, could be some mdsal bug
287 LOG.trace("Failed to get the modification type for mod {}", mod);
292 boolean isDataUpdated(Optional<DataObject> existingDataOptional, DataObject newData) {
293 return !existingDataOptional.isPresent() || !Objects.equals(existingDataOptional.get(), newData);