package org.opendaylight.yangtools.util.concurrent;
+import com.google.common.annotations.Beta;
+
import java.util.Collection;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
/**
* A {@link LinkedBlockingQueue} that tracks the largest queue size for debugging.
* @param <E> the element t.ype
*/
public class TrackingLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {
-
+ @SuppressWarnings("rawtypes")
+ private static final AtomicIntegerFieldUpdater<TrackingLinkedBlockingQueue> LARGEST_QUEUE_SIZE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(TrackingLinkedBlockingQueue.class, "largestQueueSize");
private static final long serialVersionUID = 1L;
/**
* Holds largestQueueSize, this long field should be only accessed
- * using {@value #LARGEST_QUEUE_SIZE_UPDATER}
+ * using {@link #LARGEST_QUEUE_SIZE_UPDATER}
*/
- private volatile long largestQueueSize = 0;
-
- @SuppressWarnings("rawtypes")
- private static AtomicLongFieldUpdater<TrackingLinkedBlockingQueue> LARGEST_QUEUE_SIZE_UPDATER = AtomicLongFieldUpdater.newUpdater(TrackingLinkedBlockingQueue.class, "largestQueueSize");
+ private volatile int largestQueueSize = 0;
/**
* @see LinkedBlockingQueue#LinkedBlockingQueue
/**
* @see LinkedBlockingQueue#LinkedBlockingQueue(Collection)
*/
- public TrackingLinkedBlockingQueue( Collection<? extends E> c ) {
+ public TrackingLinkedBlockingQueue( final Collection<? extends E> c ) {
super(c);
}
/**
* @see LinkedBlockingQueue#LinkedBlockingQueue(int)
*/
- public TrackingLinkedBlockingQueue( int capacity ) {
+ public TrackingLinkedBlockingQueue( final int capacity ) {
super(capacity);
}
/**
* Returns the largest queue size.
+ *
+ * FIXME: the this return will be changed to int in a future release.
*/
- public long getLargestQueueSize(){
+ @Beta
+ public long getLargestQueueSize() {
return largestQueueSize;
}
@Override
- public boolean offer( E e, long timeout, TimeUnit unit ) throws InterruptedException {
+ public boolean offer( final E e, final long timeout, final TimeUnit unit ) throws InterruptedException {
if( super.offer( e, timeout, unit ) ) {
updateLargestQueueSize();
return true;
}
@Override
- public boolean offer( E e ) {
+ public boolean offer( final E e ) {
if( super.offer( e ) ) {
updateLargestQueueSize();
return true;
}
@Override
- public void put( E e ) throws InterruptedException {
+ public void put( final E e ) throws InterruptedException {
super.put( e );
updateLargestQueueSize();
}
@Override
- public boolean add( E e ) {
+ public boolean add( final E e ) {
boolean result = super.add( e );
updateLargestQueueSize();
return result;
}
@Override
- public boolean addAll( Collection<? extends E> c ) {
+ public boolean addAll( final Collection<? extends E> c ) {
try {
return super.addAll( c );
} finally {
}
private void updateLargestQueueSize() {
- long size = size();
- long largest = largestQueueSize;
- if( size > largest ) {
- LARGEST_QUEUE_SIZE_UPDATER.compareAndSet(this, largest, size );
- }
+ final int size = size();
+
+ int largest;
+ do {
+ largest = largestQueueSize;
+ } while (size > largest && !LARGEST_QUEUE_SIZE_UPDATER.weakCompareAndSet(this, largest, size));
}
}