import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.genius.datastoreutils.AsyncDataTreeChangeListenerBase;
+import org.opendaylight.genius.datastoreutils.DataStoreJobCoordinator;
import org.opendaylight.yang.gen.v1.urn.huawei.params.xml.ns.yang.l3vpn.rev140815.VpnAfConfig;
import org.opendaylight.yang.gen.v1.urn.huawei.params.xml.ns.yang.l3vpn.rev140815.VpnInstances;
import org.opendaylight.yang.gen.v1.urn.huawei.params.xml.ns.yang.l3vpn.rev140815.vpn.instances.VpnInstance;
public class ArpScheduler extends AsyncDataTreeChangeListenerBase<VpnPortipToPort,ArpScheduler> {
- private static final Logger logger = LoggerFactory.getLogger(ArpScheduler.class);
private ScheduledExecutorService executorService;
private OdlInterfaceRpcService interfaceRpc;
private DataBroker dataBroker;
scheduleExpiredEntryDrainerTask();
}
- public void addOrUpdateMacEntryToQueue(String vpnName, MacAddress macAddress, InetAddress InetAddress, String interfaceName) {
- MacEntry newMacEntry = new MacEntry(ArpConstants.arpCacheTimeout,vpnName,macAddress, InetAddress,interfaceName );
- if (!macEntryQueue.contains(newMacEntry)) {
- LOG.info("Adding ARP cache");
- macEntryQueue.offer(newMacEntry);
- }
- else{
- LOG.info("Updating ARP cache");
- macEntryQueue.remove(newMacEntry);
- macEntryQueue.offer(newMacEntry); }
- }
-
private void scheduleExpiredEntryDrainerTask() {
LOG.info("Scheduling expired entry drainer task");
}
- private class ExpiredEntryDrainerTask implements Runnable {
- @Override
- public void run() {
- WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
- Collection<MacEntry> expiredMacEntries = new ArrayList<>();
- macEntryQueue.drainTo(expiredMacEntries);
- for (MacEntry macEntry: expiredMacEntries) {
- LOG.info("Removing the ARP cache for"+macEntry);
- InstanceIdentifier<VpnPortipToPort> id = getVpnPortipToPortInstanceOpDataIdentifier(macEntry.getIpAddress().getHostAddress(),macEntry.getVpnName());
- Optional<VpnPortipToPort> vpnPortipToPort = VpnUtil.read(dataBroker, LogicalDatastoreType.CONFIGURATION, id);
- if (vpnPortipToPort.isPresent()) {
- VpnPortipToPort vpnPortipToPortold = vpnPortipToPort.get();
- String fixedip = vpnPortipToPortold.getPortFixedip();
- String vpnName = vpnPortipToPortold.getVpnName();
- String interfaceName = vpnPortipToPortold.getPortName();
- String rd = getRouteDistinguisher(vpnName);
- deleteVrfEntries(rd,fixedip,tx);
- deleteAdjacencies(fixedip,vpnName,interfaceName,tx);
- tx.delete(LogicalDatastoreType.CONFIGURATION, id);
- waitForTransactionToComplete(tx);
- }
-
- }
- }
- }
- private void deleteVrfEntries(String rd, String fixedip, WriteTransaction tx) {
- InstanceIdentifier<VrfEntry> vrfid= InstanceIdentifier.builder(FibEntries.class).
- child(VrfTables.class, new VrfTablesKey(rd)).
- child(VrfEntry.class,new VrfEntryKey(iptoprefix(fixedip))).
- build();
-
- tx.delete(LogicalDatastoreType.CONFIGURATION, vrfid);
- LOG.info("deleting the vrf entries");
-
-
- }
-
+ private class ExpiredEntryDrainerTask implements Runnable {
+ @Override
+ public void run() {
+ Collection<MacEntry> expiredMacEntries = new ArrayList<>();
+ macEntryQueue.drainTo(expiredMacEntries);
+ for (MacEntry macEntry: expiredMacEntries) {
+ LOG.info("Removing the ARP cache for"+macEntry);
+ InstanceIdentifier<VpnPortipToPort> id = getVpnPortipToPortInstanceOpDataIdentifier(macEntry.getIpAddress().getHostAddress(),macEntry.getVpnName());
+ Optional<VpnPortipToPort> vpnPortipToPort = VpnUtil.read(dataBroker, LogicalDatastoreType.CONFIGURATION, id);
+ if (vpnPortipToPort.isPresent()) {
+ VpnPortipToPort vpnPortipToPortold = vpnPortipToPort.get();
+ String fixedip = vpnPortipToPortold.getPortFixedip();
+ String vpnName = vpnPortipToPortold.getVpnName();
+ String interfaceName = vpnPortipToPortold.getPortName();
+ String rd = getRouteDistinguisher(vpnName);
+ DataStoreJobCoordinator coordinator = DataStoreJobCoordinator.getInstance();
+ coordinator.enqueueJob(buildJobKey(fixedip,vpnName), new ArpremovechacheTask(dataBroker,fixedip, vpnName,interfaceName, rd,id));
+ }
- public void deleteAdjacencies(String fixedip, String vpnName, String interfaceName, WriteTransaction tx) {
- InstanceIdentifier<VpnInterface> vpnIfId = VpnUtil.getVpnInterfaceIdentifier(interfaceName);
- InstanceIdentifier<Adjacencies> path = vpnIfId.augmentation(Adjacencies.class);
- Optional<Adjacencies> adjacencies = VpnUtil.read(dataBroker, LogicalDatastoreType.CONFIGURATION, path);
- if (adjacencies.isPresent()) {
- List<Adjacency> adjacencyList = adjacencies.get().getAdjacency();
- InstanceIdentifier <Adjacency> adid = vpnIfId.augmentation(Adjacencies.class).child(Adjacency.class, new AdjacencyKey(iptoprefix(fixedip)));
- Optional<Adjacency> newAdj = VpnUtil.read(dataBroker, LogicalDatastoreType.CONFIGURATION, adid);
- if(adjacencyList.contains(newAdj.get()))
- adjacencyList.remove(newAdj.get());
- Adjacencies aug = VpnUtil.getVpnInterfaceAugmentation(adjacencyList);
- VpnInterface newVpnIntf = new VpnInterfaceBuilder().setKey(new VpnInterfaceKey(interfaceName)).
- setName(interfaceName).setVpnInstanceName(vpnName).addAugmentation(Adjacencies.class, aug).build();
- tx.put(LogicalDatastoreType.CONFIGURATION, vpnIfId, newVpnIntf,true);
- LOG.info("deleting the adjacencies ");
}
+ }
}
- public static void waitForTransactionToComplete(WriteTransaction tx) {
- CheckedFuture<Void, TransactionCommitFailedException> futures = tx.submit();
- try {
- futures.get();
- } catch (InterruptedException | ExecutionException e) {
- logger.error("Error writing to datastore {}", e);
- }
- }
- private String iptoprefix(String ip){
- return new StringBuilder(ip).append(ArpConstants.PREFIX).toString();
-
- }
- private static final FutureCallback<Void> DEFAULT_CALLBACK =
- new FutureCallback<Void>() {
- @Override
- public synchronized void onSuccess(Void result) {
- LOG.debug("Success in Datastore operation");
- }
-
- @Override
- public void onFailure(Throwable error) {
- LOG.error("Error in Datastore operation", error);
- };
- };
private String getRouteDistinguisher(String vpnName) {
InstanceIdentifier<VpnInstance> id = InstanceIdentifier.builder(VpnInstances.class)
.child(VpnInstance.class, new VpnInstanceKey(vpnName)).build();
MacAddress srcMacAddress = MacAddress.getDefaultInstance(value.getMacAddress());
String vpnName = value.getVpnName();
String interfaceName = value.getPortName();
- addOrUpdateMacEntryToQueue(vpnName,srcMacAddress, srcInetAddr, interfaceName);
- } catch (Exception e) {
+ Boolean islearnt = value.isLearnt();
+ if(islearnt)
+ {
+ DataStoreJobCoordinator coordinator = DataStoreJobCoordinator.getInstance();
+ coordinator.enqueueJob(buildJobKey(srcInetAddr.toString(),vpnName), new ArpaddchacheTask(srcInetAddr, srcMacAddress, vpnName,interfaceName, macEntryQueue));
+ }
+ } catch (Exception e) {
LOG.error("Error in deserializing packet {} with exception {}", value, e);
- e.printStackTrace();
- }
+ e.printStackTrace();
+ }
}
Boolean islearnt = value.isLearnt();
if(islearnt)
{
- addOrUpdateMacEntryToQueue(vpnName,srcMacAddress, srcInetAddr, interfaceName);
+ DataStoreJobCoordinator coordinator = DataStoreJobCoordinator.getInstance();
+ if(islearnt)
+ {
+ coordinator.enqueueJob(buildJobKey(srcInetAddr.toString(),vpnName), new ArpaddchacheTask(srcInetAddr, srcMacAddress, vpnName,interfaceName, macEntryQueue));
+ }
}
}
catch (Exception e) {
// TODO Auto-generated method stub
}
+
+ private String buildJobKey(String ip, String vpnName){
+ return new StringBuilder(ArpConstants.ARPJOB).append(ip).append(vpnName).toString();
+
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netvirt.vpnmanager;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.DelayQueue;
+
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.MacAddress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class ArpaddchacheTask implements Callable<List<ListenableFuture<Void>>> {
+ InetAddress srcInetAddr;
+ MacAddress srcMacAddress;
+ String vpnName;
+ String interfaceName;
+ DelayQueue<MacEntry> macEntryQueue;
+ private static final Logger LOG = LoggerFactory.getLogger(ArpaddchacheTask.class);
+
+ public ArpaddchacheTask(InetAddress srcInetAddr, MacAddress srcMacAddress, String vpnName, String interfaceName,
+ DelayQueue<MacEntry> macEntryQueue) {
+ super();
+ this.srcInetAddr = srcInetAddr;
+ this.srcMacAddress = srcMacAddress;
+ this.vpnName = vpnName;
+ this.interfaceName = interfaceName;
+ this.macEntryQueue = macEntryQueue;
+ }
+
+
+
+ @Override
+ public List<ListenableFuture<Void>> call() throws Exception {
+ List<ListenableFuture<Void>> futures = new ArrayList<>();
+ addOrUpdateMacEntryToQueue(vpnName,srcMacAddress, srcInetAddr, interfaceName);
+ return futures;
+ }
+
+ public void addOrUpdateMacEntryToQueue(String vpnName, MacAddress macAddress,InetAddress InetAddress, String interfaceName) {
+ MacEntry newMacEntry = new MacEntry(ArpConstants.arpCacheTimeout,vpnName,macAddress, InetAddress,interfaceName );
+ if (!macEntryQueue.contains(newMacEntry)) {
+ LOG.info("Adding ARP cache");
+ macEntryQueue.offer(newMacEntry);
+ }
+ else{
+ macEntryQueue.remove(newMacEntry);
+ macEntryQueue.offer(newMacEntry);
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netvirt.vpnmanager;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.yang.gen.v1.urn.huawei.params.xml.ns.yang.l3vpn.rev140815.vpn.interfaces.VpnInterface;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.l3vpn.rev130911.Adjacencies;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.l3vpn.rev130911.adjacency.list.Adjacency;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.l3vpn.rev130911.adjacency.list.AdjacencyKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.fibmanager.rev150330.FibEntries;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.fibmanager.rev150330.fibentries.VrfTables;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.fibmanager.rev150330.fibentries.VrfTablesKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.fibmanager.rev150330.vrfentries.VrfEntry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.fibmanager.rev150330.vrfentries.VrfEntryKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.neutronvpn.rev150602.neutron.vpn.portip.port.data.VpnPortipToPort;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class ArpremovechacheTask implements Callable<List<ListenableFuture<Void>>> {
+ DataBroker dataBroker;
+ String fixedip;
+ String vpnName;
+ String interfaceName;
+ String rd;
+ InstanceIdentifier<VpnPortipToPort> id;
+ private static final Logger LOG = LoggerFactory.getLogger(ArpremovechacheTask.class);
+
+ public ArpremovechacheTask(DataBroker dataBroker, String fixedip, String vpnName, String interfaceName, String rd,
+ InstanceIdentifier<VpnPortipToPort> id) {
+ super();
+ this.fixedip = fixedip;
+ this.vpnName = vpnName;
+ this.interfaceName = interfaceName;
+ this.rd = rd;
+ this.dataBroker = dataBroker;
+ this.id = id;
+ }
+
+ @Override
+ public List<ListenableFuture<Void>> call() throws Exception {
+ WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
+ List<ListenableFuture<Void>> result = new ArrayList<ListenableFuture<Void>>();
+ deleteVrfEntries(rd,fixedip,tx);
+ deleteAdjacencies(fixedip,vpnName,interfaceName,tx);
+ tx.delete(LogicalDatastoreType.CONFIGURATION, id);
+ CheckedFuture<Void, TransactionCommitFailedException> futures = tx.submit();
+ try {
+ futures.get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Error writing to datastore {}", e);
+ }
+ result.add(futures);
+ return result;
+
+ }
+
+ private void deleteVrfEntries(String rd, String fixedip, WriteTransaction tx) {
+ InstanceIdentifier<VrfEntry> vrfid= InstanceIdentifier.builder(FibEntries.class).
+ child(VrfTables.class, new VrfTablesKey(rd)).
+ child(VrfEntry.class,new VrfEntryKey(iptoprefix(fixedip))).
+ build();
+
+ tx.delete(LogicalDatastoreType.CONFIGURATION, vrfid);
+ }
+
+
+ public void deleteAdjacencies(String fixedip, String vpnName, String interfaceName, WriteTransaction tx) {
+ InstanceIdentifier<VpnInterface> vpnIfId = VpnUtil.getVpnInterfaceIdentifier(interfaceName);
+ InstanceIdentifier<Adjacencies> path = vpnIfId.augmentation(Adjacencies.class);
+ Optional<Adjacencies> adjacencies = VpnUtil.read(dataBroker, LogicalDatastoreType.CONFIGURATION, path);
+ if (adjacencies.isPresent()) {
+ InstanceIdentifier <Adjacency> adid = vpnIfId.augmentation(Adjacencies.class).child(Adjacency.class, new AdjacencyKey(iptoprefix(fixedip)));
+ tx.delete(LogicalDatastoreType.CONFIGURATION, adid);
+ LOG.info("deleting the adjacencies ");
+ }
+ }
+
+ private String iptoprefix(String ip){
+ return new StringBuilder(ip).append(ArpConstants.PREFIX).toString();
+
+ }
+}