Merge "Add ovsdb/netvirt ui to vpnservice netvirt code."
[netvirt.git] / vpnservice / bgpmanager / bgpmanager-impl / src / main / java / org / opendaylight / netvirt / bgpmanager / BgpUtil.java
1 /*
2  * Copyright (c) 2015 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netvirt.bgpmanager;
9
10 import com.google.common.base.Optional;
11 import com.google.common.util.concurrent.CheckedFuture;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ThreadFactoryBuilder;
15 import org.opendaylight.controller.md.sal.binding.api.*;
16 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
17 import org.opendaylight.controller.md.sal.common.api.data.*;
18 import org.opendaylight.genius.utils.batching.ActionableResource;
19 import org.opendaylight.genius.utils.batching.ActionableResourceImpl;
20 import org.opendaylight.genius.utils.batching.ResourceBatchingManager;
21 import org.opendaylight.genius.utils.batching.ResourceHandler;
22 import org.opendaylight.yangtools.yang.binding.DataObject;
23 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 import java.util.concurrent.*;
28 import java.util.concurrent.atomic.AtomicInteger;
29
30 public class BgpUtil {
31     private static final Logger LOG = LoggerFactory.getLogger(BgpUtil.class);
32     private static DataBroker  dataBroker;
33     private static BindingTransactionChain fibTransact;
34     public static final int PERIODICITY = 500;
35     private static AtomicInteger pendingWrTransaction = new AtomicInteger(0);
36     public static final int BATCH_SIZE = 1000;
37     public static Integer batchSize;
38     public static Integer batchInterval;
39     private static int txChainAttempts = 0;
40
41     private static BlockingQueue<ActionableResource> bgpResourcesBufferQ = new LinkedBlockingQueue<>();
42
43     // return number of pending Write Transactions with BGP-Util (no read)
44     public static int getGetPendingWrTransaction() {
45         return pendingWrTransaction.get();
46     }
47
48     static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
49             .setNameFormat("bgp-util-mdsal-%d").build();
50
51     static ExecutorService threadPool = Executors.newFixedThreadPool(1, namedThreadFactory);
52
53
54     static synchronized BindingTransactionChain getTransactionChain() {
55         return fibTransact;
56     }
57
58     static void registerWithBatchManager(ResourceHandler resourceHandler) {
59         ResourceBatchingManager resBatchingManager = ResourceBatchingManager.getInstance();
60         resBatchingManager.registerBatchableResource("BGP-RESOURCES", bgpResourcesBufferQ, resourceHandler);
61     }
62
63     static <T extends DataObject> void update(DataBroker broker, final LogicalDatastoreType datastoreType,
64                                               final InstanceIdentifier<T> path, final T data) {
65         ActionableResource actResource = new ActionableResourceImpl(path.toString());
66         actResource.setAction(ActionableResource.UPDATE);
67         actResource.setInstanceIdentifier(path);
68         actResource.setInstance(data);
69         bgpResourcesBufferQ.add(actResource);
70     }
71
72     public static <T extends DataObject> void write(DataBroker broker, final LogicalDatastoreType datastoreType,
73                                                     final InstanceIdentifier<T> path, final T data) {
74         ActionableResource actResource = new ActionableResourceImpl(path.toString());
75         actResource.setAction(ActionableResource.CREATE);
76         actResource.setInstanceIdentifier(path);
77         actResource.setInstance(data);
78         bgpResourcesBufferQ.add(actResource);
79     }
80
81     static <T extends DataObject> void delete(DataBroker broker, final LogicalDatastoreType datastoreType,
82                                               final InstanceIdentifier<T> path) {
83         ActionableResource actResource = new ActionableResourceImpl(path.toString());
84         actResource.setAction(ActionableResource.DELETE);
85         actResource.setInstanceIdentifier(path);
86         actResource.setInstance(null);
87         bgpResourcesBufferQ.add(actResource);
88     }
89
90
91     public static <T extends DataObject> Optional<T> read(DataBroker broker, LogicalDatastoreType datastoreType,
92                                                           InstanceIdentifier<T> path)
93             throws ExecutionException, InterruptedException, TimeoutException {
94
95         ReadTransaction tx = broker.newReadOnlyTransaction();
96         CheckedFuture<?,?> result = tx.read(datastoreType, path);
97
98         try {
99             return (Optional<T>) result.get();
100         } catch (Exception e) {
101             LOG.error("DataStore  read exception {} ", e);
102         }
103         return Optional.absent();
104     }
105
106     public static void setBroker(final DataBroker broker) {
107         BgpUtil.dataBroker = broker;
108         initTransactionChain();
109     }
110
111     static synchronized void initTransactionChain() {
112         try {
113             if (fibTransact != null) {
114                 fibTransact.close();
115                 LOG.error("*** TxChain Close, *** Attempts: {}", txChainAttempts);
116                 fibTransact = null;
117             }
118         } catch (Exception ignore) {
119         }
120         BgpUtil.fibTransact = dataBroker.createTransactionChain(new BgpUtilTransactionChainListener());
121         txChainAttempts++;
122     }
123
124     static class BgpUtilTransactionChainListener implements TransactionChainListener {
125         @Override
126         public void onTransactionChainFailed(TransactionChain<?, ?> transactionChain, AsyncTransaction<?, ?> asyncTransaction, Throwable throwable) {
127             LOG.error("*** TxChain Creation Failed *** Attempts: {}", txChainAttempts);
128             initTransactionChain();
129         }
130
131         @Override
132         public void onTransactionChainSuccessful(TransactionChain<?, ?> transactionChain) {
133             LOG.trace("TxChain Creation Success");
134         }
135     }
136
137     public static DataBroker getBroker() {
138         return dataBroker;
139     }
140 }