2 * Copyright (c) 2017 - 2018 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netvirt.coe.listeners;
11 import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
12 import static org.opendaylight.genius.infra.Datastore.OPERATIONAL;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import java.util.ArrayList;
16 import java.util.Collection;
17 import java.util.List;
18 import java.util.Objects;
19 import java.util.concurrent.Callable;
20 import javax.annotation.Nonnull;
21 import javax.annotation.PreDestroy;
22 import javax.inject.Inject;
23 import javax.inject.Singleton;
24 import org.apache.aries.blueprint.annotation.service.Reference;
25 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
26 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
27 import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
28 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
29 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
30 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
31 import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
32 import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
33 import org.opendaylight.infrautils.jobcoordinator.JobCoordinator;
34 import org.opendaylight.netvirt.coe.utils.CoeUtils;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.coe.northbound.pod.rev170611.Coe;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.coe.northbound.pod.rev170611.coe.Pods;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.coe.northbound.pod.rev170611.pod_attributes.Interface;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.elan.rev150602.elan.instances.ElanInstance;
39 import org.opendaylight.yangtools.concepts.ListenerRegistration;
40 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
45 public class PodListener implements DataTreeChangeListener<Pods> {
47 private static final Logger LOG = LoggerFactory.getLogger(PodListener.class);
48 private ListenerRegistration<PodListener> listenerRegistration;
49 private final JobCoordinator jobCoordinator;
50 private final ManagedNewTransactionRunner txRunner;
51 private final CoeUtils coeUtils;
54 public PodListener(@Reference final DataBroker dataBroker, @Reference JobCoordinator jobCoordinator,
56 registerListener(LogicalDatastoreType.CONFIGURATION, dataBroker);
57 this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
58 this.jobCoordinator = jobCoordinator;
59 this.coeUtils = coeUtils;
62 protected InstanceIdentifier<Pods> getWildCardPath() {
63 return InstanceIdentifier.create(Coe.class).child(Pods.class);
66 public void registerListener(LogicalDatastoreType dsType, final DataBroker db) {
67 final DataTreeIdentifier<Pods> treeId = new DataTreeIdentifier<>(dsType, getWildCardPath());
68 listenerRegistration = db.registerDataTreeChangeListener(treeId, PodListener.this);
73 if (listenerRegistration != null) {
75 listenerRegistration.close();
77 listenerRegistration = null;
83 public void onDataTreeChanged(@Nonnull Collection<DataTreeModification<Pods>> collection) {
85 podsDataTreeModification -> podsDataTreeModification.getRootNode().getModifiedChildren().stream()
87 dataObjectModification -> dataObjectModification.getDataType().equals(Interface.class))
88 .forEach(dataObjectModification -> onPodInterfacesChanged(
89 (DataObjectModification<Interface>) dataObjectModification,
90 podsDataTreeModification.getRootPath().getRootIdentifier(),
91 podsDataTreeModification.getRootNode()))
95 public void onPodInterfacesChanged(final DataObjectModification<Interface> dataObjectModification,
96 final InstanceIdentifier<Pods> rootIdentifier,
97 DataObjectModification<Pods> rootNode) {
98 Pods pods = rootNode.getDataAfter();
99 Pods podsBefore = rootNode.getDataBefore();
100 Interface podInterfaceBefore = dataObjectModification.getDataBefore();
101 Interface podInterfaceAfter = dataObjectModification.getDataAfter();
102 switch (dataObjectModification.getModificationType()) {
104 remove(podsBefore, podInterfaceBefore);
106 case SUBTREE_MODIFIED:
107 update(rootIdentifier, pods, podsBefore, podInterfaceBefore, podInterfaceAfter);
110 if (podInterfaceBefore == null) {
111 add(rootIdentifier, pods, podInterfaceAfter);
113 update(rootIdentifier, pods, podsBefore, podInterfaceBefore,
118 LOG.error("Unhandled Modificiation Type{} for {}", dataObjectModification.getModificationType(),
123 private void add(InstanceIdentifier<Pods> instanceIdentifier, Pods pods, Interface podInterface) {
124 LOG.trace("Pod added {}",pods);
125 if (pods.getNetworkNS() == null || pods.getHostIpAddress() == null) {
126 LOG.warn("pod {} added with insufficient information to process", pods.getName());
129 jobCoordinator.enqueueJob(pods.getName(), new PodConfigAddWorker(txRunner, coeUtils,
130 instanceIdentifier, pods, podInterface));
133 private void update(InstanceIdentifier<Pods> instanceIdentifier, Pods podsAfter, Pods podsBefore,
134 Interface podInterfaceBefore, Interface podInterfaceAfter) {
135 LOG.trace("Pod updated before :{}, after :{}",podsBefore, podsAfter);
136 if (!Objects.equals(podsAfter.getHostIpAddress(), podsBefore.getHostIpAddress())
137 || !Objects.equals(podInterfaceBefore.getIpAddress(), podInterfaceAfter.getIpAddress())) {
138 //if (podsBefore.getNetworkNS() != null || podsBefore.getHostIpAddress() != null) {
139 // Case where pod is moving from one namespace to another
140 // issue a delete of all previous configuration, and add the new one.
141 //jobCoordinator.enqueueJob(podsAfter.getName(), new PodConfigRemoveWorker(txRunner, podsBefore));
143 jobCoordinator.enqueueJob(podsAfter.getName(), new PodConfigAddWorker(txRunner, coeUtils,
144 instanceIdentifier, podsAfter, podInterfaceAfter));
148 private void remove(Pods pods, Interface podInterface) {
149 LOG.trace("Pod removed {}", pods);
150 if (pods.getNetworkNS() == null || pods.getHostIpAddress() == null) {
151 LOG.warn("pod {} deletion without a valid network id", podInterface.getUid().getValue());
155 jobCoordinator.enqueueJob(pods.getName(), new PodConfigRemoveWorker(txRunner, coeUtils, pods));
158 private static class PodConfigAddWorker implements Callable<List<ListenableFuture<Void>>> {
159 InstanceIdentifier<Pods> podsInstanceIdentifier;
160 private final Pods pods;
161 private final Interface podInterface;
162 private final ManagedNewTransactionRunner txRunner;
163 private final CoeUtils coeUtils;
165 PodConfigAddWorker(ManagedNewTransactionRunner txRunner, CoeUtils coeUtils,
166 InstanceIdentifier<Pods> podsInstanceIdentifier,
167 Pods pods, Interface podInterface) {
169 this.podInterface = podInterface;
170 this.txRunner = txRunner;
171 this.podsInstanceIdentifier = podsInstanceIdentifier;
172 this.coeUtils = coeUtils;
176 public List<ListenableFuture<Void>> call() {
177 LOG.trace("Adding Pod : {}", podInterface);
178 String interfaceName = coeUtils.buildInterfaceName(pods.getClusterId().getValue(), pods.getName());
179 List<ListenableFuture<Void>> futures = new ArrayList<>();
180 futures.add(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, tx -> {
181 String nodeIp = pods.getHostIpAddress().stringValue();
182 ElanInstance elanInstance = coeUtils.createElanInstanceForTheFirstPodInTheNetwork(
183 pods.getClusterId().getValue(), nodeIp, podInterface, tx);
184 LOG.info("interface creation for pod {}", interfaceName);
185 String portInterfaceName = coeUtils.createOfPortInterface(interfaceName, tx);
186 LOG.debug("Creating ELAN Interface for pod {}", interfaceName);
187 coeUtils.createElanInterface(portInterfaceName,
188 elanInstance.getElanInstanceName(), tx);
190 if (podInterface.getIpAddress() != null) {
191 futures.add(txRunner.callWithNewWriteOnlyTransactionAndSubmit(OPERATIONAL, tx -> {
192 coeUtils.createPodNameToPodUuidMap(interfaceName, podsInstanceIdentifier, tx);
199 private static class PodConfigRemoveWorker implements Callable<List<ListenableFuture<Void>>> {
200 private final Pods pods;
201 private final ManagedNewTransactionRunner txRunner;
202 private final CoeUtils coeUtils;
204 PodConfigRemoveWorker(ManagedNewTransactionRunner txRunner, CoeUtils coeUtils,
207 this.txRunner = txRunner;
208 this.coeUtils = coeUtils;
212 public List<ListenableFuture<Void>> call() {
213 List<ListenableFuture<Void>> futures = new ArrayList<>();
214 String podInterfaceName = coeUtils.buildInterfaceName(pods.getClusterId().getValue(), pods.getName());
215 futures.add(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION, tx -> {
216 LOG.trace("Deleting Pod : {}", podInterfaceName);
217 LOG.debug("Deleting VPN Interface for pod {}", podInterfaceName);
218 coeUtils.deleteVpnInterface(podInterfaceName, tx);
219 LOG.debug("Deleting ELAN Interface for pod {}", podInterfaceName);
220 coeUtils.deleteElanInterface(podInterfaceName, tx);
221 LOG.info("interface deletion for pod {}", podInterfaceName);
222 coeUtils.deleteOfPortInterface(podInterfaceName, tx);
223 coeUtils.unbindKubeProxyService(podInterfaceName, tx);
225 futures.add(txRunner.callWithNewWriteOnlyTransactionAndSubmit(OPERATIONAL, tx -> {
226 coeUtils.deletePodNameToPodUuidMap(podInterfaceName, tx);