package org.opendaylight.protocol.data.change.counter;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.Collection;
-import java.util.concurrent.atomic.AtomicLong;
-
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.LongAdder;
import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.data.change.counter.rev160315.DataChangeCounter;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.data.change.counter.rev160315.data.change.counter.Counter;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.data.change.counter.rev160315.data.change.counter.CounterBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-final class TopologyDataChangeCounter implements ClusteredDataTreeChangeListener<Topology>, TransactionChainListener, AutoCloseable {
+final class TopologyDataChangeCounter implements ClusteredDataTreeChangeListener<Topology>,
+ TransactionChainListener, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(TopologyDataChangeCounter.class);
private final DataBroker dataBroker;
private final String counterId;
private final InstanceIdentifier<Counter> counterInstanceId;
- private final BindingTransactionChain chain;
- private final AtomicLong count;
+ private final LongAdder count = new LongAdder();
private final ListenerRegistration<TopologyDataChangeCounter> registration;
- private final String topologyName;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+ private BindingTransactionChain transactionChain;
- public TopologyDataChangeCounter(final DataBroker dataBroker, final String counterId, final String topologyName) {
+ TopologyDataChangeCounter(final DataBroker dataBroker, final String counterId, final String topologyName) {
this.dataBroker = dataBroker;
- this.topologyName = topologyName;
- this.chain = this.dataBroker.createTransactionChain(this);
+ this.transactionChain = this.dataBroker.createTransactionChain(this);
this.counterId = counterId;
this.counterInstanceId = InstanceIdentifier.builder(DataChangeCounter.class)
- .child(Counter.class, new CounterKey(this.counterId)).build();
- this.count = new AtomicLong(0);
- putCount(this.count.get());
+ .child(Counter.class, new CounterKey(this.counterId)).build();
+ putCount(this.count.longValue());
final InstanceIdentifier<Topology> topoIId = InstanceIdentifier.builder(NetworkTopology.class)
- .child(Topology.class, new TopologyKey(new TopologyId(topologyName))).build();
+ .child(Topology.class, new TopologyKey(new TopologyId(topologyName))).build();
this.registration = this.dataBroker.registerDataTreeChangeListener(
- new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, topoIId), this);
+ new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, topoIId), this);
LOG.debug("Data change counter {} initiated", this.counterId);
}
@Override
- public void onDataTreeChanged(final Collection<DataTreeModification<Topology>> changes) {
- putCount(this.count.incrementAndGet());
- LOG.debug("Data change #{} for counter {}", this.count.get(), this.counterId);
+ public synchronized void onDataTreeChanged(final Collection<DataTreeModification<Topology>> changes) {
+ this.count.increment();
+ final long inc = this.count.sum();
+ LOG.debug("Data change #{} for counter {}", inc, this.counterId);
+ putCount(inc);
}
@Override
- public void close() {
+ public synchronized void close() {
this.registration.close();
final WriteTransaction wTx = this.dataBroker.newWriteOnlyTransaction();
wTx.delete(LogicalDatastoreType.OPERATIONAL, this.counterInstanceId);
try {
- wTx.submit().checkedGet();
- } catch (final TransactionCommitFailedException except) {
+ wTx.submit().get();
+ } catch (final ExecutionException | InterruptedException except) {
LOG.warn("Error on remove data change counter {}", this.counterId, except);
}
- this.chain.close();
+ this.transactionChain.close();
LOG.debug("Data change counter {} removed", this.counterId);
}
- private void putCount(final long count) {
- final WriteTransaction wTx = this.chain.newWriteOnlyTransaction();
- final Counter counter = new CounterBuilder().setId(this.counterId).setCount(count).build();
+ private void putCount(final long totalCount) {
+ final WriteTransaction wTx = this.transactionChain.newWriteOnlyTransaction();
+ final Counter counter = new CounterBuilder().setId(this.counterId).setCount(totalCount).build();
wTx.put(LogicalDatastoreType.OPERATIONAL, this.counterInstanceId, counter);
- wTx.submit();
+ Futures.addCallback(wTx.submit(), new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(Void result) {
+ LOG.debug("Data change count update stored");
+ }
+
+ @Override
+ public void onFailure(Throwable ex) {
+ LOG.error("Failed to store Data change count");
+ }
+ }, MoreExecutors.directExecutor());
}
@Override
- public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
- final Throwable cause) {
- chain.close();
+ public synchronized void onTransactionChainFailed(final TransactionChain<?, ?> chain,
+ final AsyncTransaction<?, ?> transaction, final Throwable cause) {
LOG.warn("Transaction chain failure. Transaction: {}", transaction, cause);
+ if (!closed.get()) {
+ this.transactionChain.close();
+ this.transactionChain = dataBroker.createTransactionChain(this);
+ }
}
@Override
- public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
+ public synchronized void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
LOG.debug("Transaction chain successful. {}", chain);
}
-
- public String getTopologyName() {
- return this.topologyName;
- }
}