Bug 7192 - Inter-VPN link routes BGP leaking not working
[netvirt.git] / vpnservice / bgpmanager / bgpmanager-impl / src / main / java / org / opendaylight / netvirt / bgpmanager / BgpUtil.java
1 /*
2  * Copyright (c) 2015 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netvirt.bgpmanager;
9
10 import com.google.common.base.Optional;
11 import com.google.common.util.concurrent.CheckedFuture;
12 import com.google.common.util.concurrent.ThreadFactoryBuilder;
13 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
14 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
15 import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
16 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
17 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
18 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
19 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
20 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
21 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
22 import org.opendaylight.genius.utils.batching.ActionableResource;
23 import org.opendaylight.genius.utils.batching.ActionableResourceImpl;
24 import org.opendaylight.genius.utils.batching.ResourceBatchingManager;
25 import org.opendaylight.genius.utils.batching.ResourceHandler;
26 import org.opendaylight.yangtools.yang.binding.DataObject;
27 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 import java.util.concurrent.BlockingQueue;
32 import java.util.concurrent.ExecutionException;
33 import java.util.concurrent.Executors;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.LinkedBlockingQueue;
36 import java.util.concurrent.TimeoutException;
37 import java.util.concurrent.ThreadFactory;
38 import java.util.concurrent.atomic.AtomicInteger;
39
40 public class BgpUtil {
41     private static final Logger LOG = LoggerFactory.getLogger(BgpUtil.class);
42     private static DataBroker  dataBroker;
43     private static BindingTransactionChain fibTransact;
44     public static final int PERIODICITY = 500;
45     private static AtomicInteger pendingWrTransaction = new AtomicInteger(0);
46     public static final int BATCH_SIZE = 1000;
47     public static Integer batchSize;
48     public static Integer batchInterval;
49     private static int txChainAttempts = 0;
50
51     private static BlockingQueue<ActionableResource> bgpResourcesBufferQ = new LinkedBlockingQueue<>();
52
53     // return number of pending Write Transactions with BGP-Util (no read)
54     public static int getGetPendingWrTransaction() {
55         return pendingWrTransaction.get();
56     }
57
58     static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
59             .setNameFormat("bgp-util-mdsal-%d").build();
60
61     static ExecutorService threadPool = Executors.newFixedThreadPool(1, namedThreadFactory);
62
63
64     static synchronized BindingTransactionChain getTransactionChain() {
65         return fibTransact;
66     }
67
68     static void registerWithBatchManager(ResourceHandler resourceHandler) {
69         ResourceBatchingManager resBatchingManager = ResourceBatchingManager.getInstance();
70         resBatchingManager.registerBatchableResource("BGP-RESOURCES", bgpResourcesBufferQ, resourceHandler);
71     }
72
73     static <T extends DataObject> void update(DataBroker broker, final LogicalDatastoreType datastoreType,
74                                               final InstanceIdentifier<T> path, final T data) {
75         ActionableResource actResource = new ActionableResourceImpl(path.toString());
76         actResource.setAction(ActionableResource.UPDATE);
77         actResource.setInstanceIdentifier(path);
78         actResource.setInstance(data);
79         bgpResourcesBufferQ.add(actResource);
80     }
81
82     public static <T extends DataObject> void write(DataBroker broker, final LogicalDatastoreType datastoreType,
83                                                     final InstanceIdentifier<T> path, final T data) {
84         ActionableResource actResource = new ActionableResourceImpl(path.toString());
85         actResource.setAction(ActionableResource.CREATE);
86         actResource.setInstanceIdentifier(path);
87         actResource.setInstance(data);
88         bgpResourcesBufferQ.add(actResource);
89     }
90
91     static <T extends DataObject> void delete(DataBroker broker, final LogicalDatastoreType datastoreType,
92                                               final InstanceIdentifier<T> path) {
93         ActionableResource actResource = new ActionableResourceImpl(path.toString());
94         actResource.setAction(ActionableResource.DELETE);
95         actResource.setInstanceIdentifier(path);
96         actResource.setInstance(null);
97         bgpResourcesBufferQ.add(actResource);
98     }
99
100
101     public static <T extends DataObject> Optional<T> read(DataBroker broker, LogicalDatastoreType datastoreType,
102                                                           InstanceIdentifier<T> path)
103             throws ExecutionException, InterruptedException, TimeoutException {
104
105         ReadTransaction tx = broker.newReadOnlyTransaction();
106         CheckedFuture<?,?> result = tx.read(datastoreType, path);
107
108         try {
109             return (Optional<T>) result.get();
110         } catch (Exception e) {
111             LOG.error("DataStore  read exception {} ", e);
112         }
113         return Optional.absent();
114     }
115
116     public static <T extends DataObject> void syncWrite(DataBroker broker, LogicalDatastoreType datastoreType,
117                                                         InstanceIdentifier<T> path, T data) {
118         WriteTransaction tx = broker.newWriteOnlyTransaction();
119         tx.put(datastoreType, path, data, true);
120         CheckedFuture<Void, TransactionCommitFailedException> futures = tx.submit();
121         try {
122             futures.get();
123         } catch (InterruptedException | ExecutionException e) {
124             LOG.error("Error writing to datastore (path, data) : ({}, {})", path, data);
125             throw new RuntimeException(e.getMessage());
126         }
127     }
128
129     public static void setBroker(final DataBroker broker) {
130         BgpUtil.dataBroker = broker;
131         initTransactionChain();
132     }
133
134     static synchronized void initTransactionChain() {
135         try {
136             if (fibTransact != null) {
137                 fibTransact.close();
138                 LOG.error("*** TxChain Close, *** Attempts: {}", txChainAttempts);
139                 fibTransact = null;
140             }
141         } catch (Exception ignore) {
142         }
143         BgpUtil.fibTransact = dataBroker.createTransactionChain(new BgpUtilTransactionChainListener());
144         txChainAttempts++;
145     }
146
147     static class BgpUtilTransactionChainListener implements TransactionChainListener {
148         @Override
149         public void onTransactionChainFailed(TransactionChain<?, ?> transactionChain, AsyncTransaction<?, ?> asyncTransaction, Throwable throwable) {
150             LOG.error("*** TxChain Creation Failed *** Attempts: {}", txChainAttempts);
151             initTransactionChain();
152         }
153
154         @Override
155         public void onTransactionChainSuccessful(TransactionChain<?, ?> transactionChain) {
156             LOG.trace("TxChain Creation Success");
157         }
158     }
159
160     public static DataBroker getBroker() {
161         return dataBroker;
162     }
163 }