2 * Copyright (c) 2015 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netvirt.bgpmanager;
10 import com.google.common.base.Optional;
11 import com.google.common.util.concurrent.CheckedFuture;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ThreadFactoryBuilder;
15 import org.opendaylight.controller.md.sal.binding.api.*;
16 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
17 import org.opendaylight.controller.md.sal.common.api.data.*;
18 import org.opendaylight.genius.utils.batching.ActionableResource;
19 import org.opendaylight.genius.utils.batching.ActionableResourceImpl;
20 import org.opendaylight.genius.utils.batching.ResourceBatchingManager;
21 import org.opendaylight.genius.utils.batching.ResourceHandler;
22 import org.opendaylight.yangtools.yang.binding.DataObject;
23 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
27 import java.util.concurrent.*;
28 import java.util.concurrent.atomic.AtomicInteger;
30 public class BgpUtil {
31 private static final Logger LOG = LoggerFactory.getLogger(BgpUtil.class);
32 private static DataBroker dataBroker;
33 private static BindingTransactionChain fibTransact;
34 public static final int PERIODICITY = 500;
35 private static AtomicInteger pendingWrTransaction = new AtomicInteger(0);
36 public static final int BATCH_SIZE = 1000;
37 public static Integer batchSize;
38 public static Integer batchInterval;
39 private static int txChainAttempts = 0;
41 private static BlockingQueue<ActionableResource> bgpResourcesBufferQ = new LinkedBlockingQueue<>();
43 // return number of pending Write Transactions with BGP-Util (no read)
44 public static int getGetPendingWrTransaction() {
45 return pendingWrTransaction.get();
48 static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
49 .setNameFormat("bgp-util-mdsal-%d").build();
51 static ExecutorService threadPool = Executors.newFixedThreadPool(1, namedThreadFactory);
54 static synchronized BindingTransactionChain getTransactionChain() {
58 static void registerWithBatchManager(ResourceHandler resourceHandler) {
59 ResourceBatchingManager resBatchingManager = ResourceBatchingManager.getInstance();
60 resBatchingManager.registerBatchableResource("BGP-RESOURCES", bgpResourcesBufferQ, resourceHandler);
63 static <T extends DataObject> void update(DataBroker broker, final LogicalDatastoreType datastoreType,
64 final InstanceIdentifier<T> path, final T data) {
65 ActionableResource actResource = new ActionableResourceImpl(path.toString());
66 actResource.setAction(ActionableResource.UPDATE);
67 actResource.setInstanceIdentifier(path);
68 actResource.setInstance(data);
69 bgpResourcesBufferQ.add(actResource);
72 public static <T extends DataObject> void write(DataBroker broker, final LogicalDatastoreType datastoreType,
73 final InstanceIdentifier<T> path, final T data) {
74 ActionableResource actResource = new ActionableResourceImpl(path.toString());
75 actResource.setAction(ActionableResource.CREATE);
76 actResource.setInstanceIdentifier(path);
77 actResource.setInstance(data);
78 bgpResourcesBufferQ.add(actResource);
81 static <T extends DataObject> void delete(DataBroker broker, final LogicalDatastoreType datastoreType,
82 final InstanceIdentifier<T> path) {
83 ActionableResource actResource = new ActionableResourceImpl(path.toString());
84 actResource.setAction(ActionableResource.DELETE);
85 actResource.setInstanceIdentifier(path);
86 actResource.setInstance(null);
87 bgpResourcesBufferQ.add(actResource);
91 public static <T extends DataObject> Optional<T> read(DataBroker broker, LogicalDatastoreType datastoreType,
92 InstanceIdentifier<T> path)
93 throws ExecutionException, InterruptedException, TimeoutException {
95 ReadTransaction tx = broker.newReadOnlyTransaction();
96 CheckedFuture<?,?> result = tx.read(datastoreType, path);
99 return (Optional<T>) result.get();
100 } catch (Exception e) {
101 LOG.error("DataStore read exception {} ", e);
103 return Optional.absent();
106 public static void setBroker(final DataBroker broker) {
107 BgpUtil.dataBroker = broker;
108 initTransactionChain();
111 static synchronized void initTransactionChain() {
113 if (fibTransact != null) {
115 LOG.error("*** TxChain Close, *** Attempts: {}", txChainAttempts);
118 } catch (Exception ignore) {
120 BgpUtil.fibTransact = dataBroker.createTransactionChain(new BgpUtilTransactionChainListener());
124 static class BgpUtilTransactionChainListener implements TransactionChainListener {
126 public void onTransactionChainFailed(TransactionChain<?, ?> transactionChain, AsyncTransaction<?, ?> asyncTransaction, Throwable throwable) {
127 LOG.error("*** TxChain Creation Failed *** Attempts: {}", txChainAttempts);
128 initTransactionChain();
132 public void onTransactionChainSuccessful(TransactionChain<?, ?> transactionChain) {
133 LOG.trace("TxChain Creation Success");
137 public static DataBroker getBroker() {