package org.opendaylight.protocol.data.change.counter;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.Collection;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.LongAdder;
import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.mdsal.common.api.CommitInfo;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.data.change.counter.rev160315.DataChangeCounter;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.data.change.counter.rev160315.data.change.counter.Counter;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.data.change.counter.rev160315.data.change.counter.CounterBuilder;
private final DataBroker dataBroker;
private final String counterId;
private final InstanceIdentifier<Counter> counterInstanceId;
- private final BindingTransactionChain bindingTx;
- private final AtomicLong count;
+ private final LongAdder count = new LongAdder();
private final ListenerRegistration<TopologyDataChangeCounter> registration;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+ private BindingTransactionChain transactionChain;
TopologyDataChangeCounter(final DataBroker dataBroker, final String counterId, final String topologyName) {
this.dataBroker = dataBroker;
- this.bindingTx = this.dataBroker.createTransactionChain(this);
+ this.transactionChain = this.dataBroker.createTransactionChain(this);
this.counterId = counterId;
this.counterInstanceId = InstanceIdentifier.builder(DataChangeCounter.class)
.child(Counter.class, new CounterKey(this.counterId)).build();
- this.count = new AtomicLong(0);
- putCount(this.count.get());
+ putCount(this.count.longValue());
final InstanceIdentifier<Topology> topoIId = InstanceIdentifier.builder(NetworkTopology.class)
.child(Topology.class, new TopologyKey(new TopologyId(topologyName))).build();
this.registration = this.dataBroker.registerDataTreeChangeListener(
}
@Override
- public void onDataTreeChanged(final Collection<DataTreeModification<Topology>> changes) {
- putCount(this.count.incrementAndGet());
- LOG.debug("Data change #{} for counter {}", this.count.get(), this.counterId);
+ public synchronized void onDataTreeChanged(final Collection<DataTreeModification<Topology>> changes) {
+ this.count.increment();
+ final long inc = this.count.sum();
+ LOG.debug("Data change #{} for counter {}", inc, this.counterId);
+ putCount(inc);
}
@Override
- public void close() {
+ public synchronized void close() {
this.registration.close();
final WriteTransaction wTx = this.dataBroker.newWriteOnlyTransaction();
wTx.delete(LogicalDatastoreType.OPERATIONAL, this.counterInstanceId);
try {
- wTx.submit().checkedGet();
- } catch (final TransactionCommitFailedException except) {
+ wTx.commit().get();
+ } catch (final ExecutionException | InterruptedException except) {
LOG.warn("Error on remove data change counter {}", this.counterId, except);
}
- this.bindingTx.close();
+ this.transactionChain.close();
LOG.debug("Data change counter {} removed", this.counterId);
}
private void putCount(final long totalCount) {
- final WriteTransaction wTx = this.bindingTx.newWriteOnlyTransaction();
+ final WriteTransaction wTx = this.transactionChain.newWriteOnlyTransaction();
final Counter counter = new CounterBuilder().setId(this.counterId).setCount(totalCount).build();
wTx.put(LogicalDatastoreType.OPERATIONAL, this.counterInstanceId, counter);
- wTx.submit();
+ wTx.commit().addCallback(new FutureCallback<CommitInfo>() {
+ @Override
+ public void onSuccess(final CommitInfo result) {
+ LOG.debug("Data change count update stored");
+ }
+
+ @Override
+ public void onFailure(final Throwable trw) {
+ LOG.error("Failed to store Data change count");
+ }
+ }, MoreExecutors.directExecutor());
}
@Override
- public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
- final Throwable cause) {
- chain.close();
+ public synchronized void onTransactionChainFailed(final TransactionChain<?, ?> chain,
+ final AsyncTransaction<?, ?> transaction, final Throwable cause) {
LOG.warn("Transaction chain failure. Transaction: {}", transaction, cause);
+ if (!closed.get()) {
+ this.transactionChain.close();
+ this.transactionChain = dataBroker.createTransactionChain(this);
+ }
}
@Override
- public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
+ public synchronized void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
LOG.debug("Transaction chain successful. {}", chain);
}
}