Merge "BUG-1793: make sure we cache QNameModule"
authorTony Tkacik <ttkacik@cisco.com>
Thu, 11 Sep 2014 09:10:20 +0000 (09:10 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Thu, 11 Sep 2014 09:10:20 +0000 (09:10 +0000)
15 files changed:
common/util/src/main/java/org/opendaylight/yangtools/util/DurationStatsTracker.java
common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/AsyncNotifyingListenableFutureTask.java
common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/CountingRejectedExecutionHandler.java
common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/DeadlockDetectingListeningExecutorService.java
common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/SettableBoolean.java [new file with mode: 0644]
common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/SettableBooleanThreadLocal.java [new file with mode: 0644]
common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/TrackingLinkedBlockingQueue.java
yang/yang-data-codec-gson/src/main/java/org/opendaylight/yangtools/yang/data/codec/gson/CompositeNodeDataWithSchema.java
yang/yang-data-codec-gson/src/main/java/org/opendaylight/yangtools/yang/data/codec/gson/JSONNormalizedNodeStreamWriter.java
yang/yang-data-impl/src/main/java/org/opendaylight/yangtools/yang/data/impl/schema/nodes/AbstractImmutableDataContainerNode.java
yang/yang-model-api/pom.xml
yang/yang-model-api/src/main/java/org/opendaylight/yangtools/yang/model/repo/api/SourceIdentifier.java
yang/yang-model-api/src/main/java/org/opendaylight/yangtools/yang/model/repo/spi/PotentialSchemaSource.java
yang/yang-model-util/src/main/java/org/opendaylight/yangtools/yang/model/repo/util/AbstractSchemaRepository.java
yang/yang-parser-impl/src/main/java/org/opendaylight/yangtools/yang/parser/repo/SharedSchemaContextFactory.java

index 21690c2864e705817a6245dc2c9ec98ac1b21675..9a29dca6a8002eaf1b62f96a44baaa8c7a2d69ce 100644 (file)
@@ -13,12 +13,13 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
 
+import com.google.common.util.concurrent.AtomicDouble;
+import java.text.DecimalFormat;
+import java.text.DecimalFormatSymbols;
 import java.util.Date;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
-import com.google.common.util.concurrent.AtomicDouble;
-
 /**
  * Class that calculates and tracks time duration statistics.
  *
@@ -26,6 +27,8 @@ import com.google.common.util.concurrent.AtomicDouble;
  */
 public class DurationStatsTracker {
 
+    private static final DecimalFormat decimalFormat;
+
     private final AtomicLong totalDurations = new AtomicLong();
     private final AtomicLong longestDuration = new AtomicLong();
     private volatile long timeOfLongestDuration;
@@ -33,10 +36,17 @@ public class DurationStatsTracker {
     private volatile long timeOfShortestDuration;
     private final AtomicDouble averageDuration = new AtomicDouble();
 
+    static {
+        final DecimalFormatSymbols symbols = DecimalFormatSymbols.getInstance();
+        symbols.setDecimalSeparator('.');
+        decimalFormat = new DecimalFormat("0.00", symbols);
+    }
+
     /**
      * Add a duration to track.
      *
-     * @param duration the duration in nanoseconds.
+     * @param duration
+     *            the duration in nanoseconds.
      */
     public void addDuration(long duration) {
 
@@ -46,21 +56,21 @@ public class DurationStatsTracker {
         long newTotal = currentTotal + 1;
 
         // Calculate moving cumulative average.
-        double newAve = currentAve * (double)currentTotal / (double)newTotal + (double)duration / (double)newTotal;
+        double newAve = currentAve * currentTotal / newTotal + (double) duration / (double) newTotal;
 
         averageDuration.compareAndSet(currentAve, newAve);
         totalDurations.compareAndSet(currentTotal, newTotal);
 
         long longest = longestDuration.get();
-        if( duration > longest ) {
-            if(longestDuration.compareAndSet( longest, duration )) {
+        if (duration > longest) {
+            if (longestDuration.compareAndSet(longest, duration)) {
                 timeOfLongestDuration = System.currentTimeMillis();
             }
         }
 
         long shortest = shortestDuration.get();
-        if( duration < shortest ) {
-            if(shortestDuration.compareAndSet( shortest, duration )) {
+        if (duration < shortest) {
+            if (shortestDuration.compareAndSet(shortest, duration)) {
                 timeOfShortestDuration = System.currentTimeMillis();
             }
         }
@@ -122,46 +132,55 @@ public class DurationStatsTracker {
     }
 
     /**
-     * Returns the average duration as a displayable String with units, e.g. "12.34 ms".
+     * Returns the average duration as a displayable String with units, e.g.
+     * "12.34 ms".
      */
     public String getDisplayableAverageDuration() {
         return formatDuration(getAverageDuration(), 0);
     }
 
     /**
-     * Returns the shortest duration as a displayable String with units and the date/time at
-     * which it occurred, e.g. "12.34 ms at 08/02/2014 12:30:24".
+     * Returns the shortest duration as a displayable String with units and the
+     * date/time at which it occurred, e.g. "12.34 ms at 08/02/2014 12:30:24".
      */
     public String getDisplayableShortestDuration() {
         return formatDuration(getShortestDuration(), getTimeOfShortestDuration());
     }
 
     /**
-     * Returns the longest duration as a displayable String with units and the date/time at
-     * which it occurred, e.g. "12.34 ms at 08/02/2014 12:30:24".
+     * Returns the longest duration as a displayable String with units and the
+     * date/time at which it occurred, e.g. "12.34 ms at 08/02/2014 12:30:24".
      */
     public String getDisplayableLongestDuration() {
         return formatDuration(getLongestDuration(), getTimeOfLongestDuration());
     }
 
+    /**
+     * Returns formatted value of number, e.g. "12.34". Always is used dot as
+     * decimal separator.
+     */
+    private static synchronized String formatDecimalValue(double value) {
+        return decimalFormat.format(value);
+    }
+
     private String formatDuration(double duration, long timeStamp) {
-        TimeUnit unit = chooseUnit((long)duration);
+        TimeUnit unit = chooseUnit((long) duration);
         double value = duration / NANOSECONDS.convert(1, unit);
-        return timeStamp > 0 ?
-                String.format("%.4g %s at %3$tD %3$tT", value, abbreviate(unit), new Date(timeStamp)) :
-                String.format("%.4g %s", value, abbreviate(unit));
+
+        return timeStamp > 0 ? String.format("%s %s at %3$tD %3$tT", formatDecimalValue(value), abbreviate(unit),
+                new Date(timeStamp)) : String.format("%s %s", formatDecimalValue(value), abbreviate(unit));
     }
 
     private static TimeUnit chooseUnit(long nanos) {
-        if(SECONDS.convert(nanos, NANOSECONDS) > 0) {
+        if (SECONDS.convert(nanos, NANOSECONDS) > 0) {
             return SECONDS;
         }
 
-        if(MILLISECONDS.convert(nanos, NANOSECONDS) > 0) {
+        if (MILLISECONDS.convert(nanos, NANOSECONDS) > 0) {
             return MILLISECONDS;
         }
 
-        if(MICROSECONDS.convert(nanos, NANOSECONDS) > 0) {
+        if (MICROSECONDS.convert(nanos, NANOSECONDS) > 0) {
             return MICROSECONDS;
         }
 
@@ -169,17 +188,17 @@ public class DurationStatsTracker {
     }
 
     private static String abbreviate(TimeUnit unit) {
-        switch(unit) {
-            case NANOSECONDS:
-                return "ns";
-            case MICROSECONDS:
-                return "\u03bcs"; // μs
-            case MILLISECONDS:
-                return "ms";
-            case SECONDS:
-                return "s";
-            default:
-                return "";
+        switch (unit) {
+        case NANOSECONDS:
+            return "ns";
+        case MICROSECONDS:
+            return "\u03bcs"; // μs
+        case MILLISECONDS:
+            return "ms";
+        case SECONDS:
+            return "s";
+        default:
+            return "";
         }
     }
 }
index 69c94f32a35cfc1e7b1ec9bf71e6287bd88b7acc..2ba16931f18a8f687836415c8da213e2a6cc3bab 100644 (file)
@@ -8,18 +8,21 @@
 
 package org.opendaylight.yangtools.util.concurrent;
 
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ExecutionList;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 import java.util.concurrent.FutureTask;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.util.concurrent.ExecutionList;
-import com.google.common.util.concurrent.ListenableFuture;
-
 /**
  * A {@link FutureTask} that also implements the {@link ListenableFuture} interface similar to
  * guava's {@link ListenableFutureTask}. This class differs from ListenableFutureTask in that it
@@ -35,38 +38,91 @@ import com.google.common.util.concurrent.ListenableFuture;
  * listener Runnable would execute in the thread that completed this task, the listener
  * is executed on Executor specified on construction.
  *
+ * Also note that the use of this task may attach some (small) amount of state to the threads
+ * interacting with it. That state will not be detached automatically, but you can use
+ *  {@link #cleanStateForCurrentThread()} to clean it up.
+ *
  * @author Thomas Pantelis
+ * @author Robert Varga
  *
  * @param <V> the Future result value type
  */
 public class AsyncNotifyingListenableFutureTask<V> extends FutureTask<V> implements ListenableFuture<V> {
+    private static final class DelegatingAsyncNotifyingListenableFutureTask<V> extends AsyncNotifyingListenableFutureTask<V> {
+        /**
+         * The executor used to run listener callbacks.
+         */
+        private final Executor listenerExecutor;
+
+        private DelegatingAsyncNotifyingListenableFutureTask(final Callable<V> callable, @Nullable final Executor listenerExecutor) {
+            super(callable);
+            this.listenerExecutor = Preconditions.checkNotNull(listenerExecutor);
+        }
+
+        private DelegatingAsyncNotifyingListenableFutureTask(final Runnable runnable, @Nullable final V result,
+                @Nullable final Executor listenerExecutor) {
+            super(runnable, result);
+            this.listenerExecutor = Preconditions.checkNotNull(listenerExecutor);
+        }
+
+        @Override
+        public void addListener(final Runnable listener, final Executor executor) {
+            // Wrap the listener Runnable in a DelegatingRunnable. If the specified executor is one that
+            // runs tasks in the same thread as the caller submitting the task
+            // (e.g. {@link com.google.common.util.concurrent.MoreExecutors#sameThreadExecutor}) and the
+            // listener is executed from the #done method, then the DelegatingRunnable will detect this
+            // via the ThreadLocal and submit the listener Runnable to the listenerExecutor.
+            //
+            // On the other hand, if this task is already complete, the call to ExecutionList#add in
+            // superclass will execute the listener Runnable immediately and, since the ThreadLocal won't be set,
+            // the DelegatingRunnable will run the listener Runnable inline.
+            super.addListener(new DelegatingRunnable(listener, listenerExecutor), executor);
+        }
+    }
 
-    private static final Logger LOG = LoggerFactory.getLogger( AsyncNotifyingListenableFutureTask.class );
+    private static final class DelegatingRunnable implements Runnable {
+        private final Runnable delegate;
+        private final Executor executor;
+
+        DelegatingRunnable(final Runnable delegate, final Executor executor) {
+            this.delegate = Preconditions.checkNotNull(delegate);
+            this.executor = Preconditions.checkNotNull(executor);
+        }
+
+        @Override
+        public void run() {
+            if (ON_TASK_COMPLETION_THREAD_TL.get().isSet()) {
+                // We're running on the task completion thread so off-load to the executor.
+                LOG.trace("Submitting ListenenableFuture Runnable from thread {} to executor {}",
+                        Thread.currentThread().getName(), executor);
+                executor.execute(delegate);
+            } else {
+                // We're not running on the task completion thread so run the delegate inline.
+                LOG.trace("Executing ListenenableFuture Runnable on this thread: {}",
+                        Thread.currentThread().getName());
+                delegate.run();
+            }
+        }
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(AsyncNotifyingListenableFutureTask.class);
 
     /**
      * ThreadLocal used to detect if the task completion thread is running the listeners.
      */
-    private static final ThreadLocal<Boolean> ON_TASK_COMPLETION_THREAD_TL = new ThreadLocal<>();
+    private static final SettableBooleanThreadLocal ON_TASK_COMPLETION_THREAD_TL = new SettableBooleanThreadLocal();
 
     /**
      *  The execution list to hold our listeners.
      */
     private final ExecutionList executionList = new ExecutionList();
 
-    /**
-     * The executor used to run listener callbacks.
-     */
-    private final Executor listenerExecutor;
-
-    private AsyncNotifyingListenableFutureTask( Callable<V> callable, @Nullable Executor listenerExecutor ) {
-        super( callable );
-        this.listenerExecutor = listenerExecutor;
+    private AsyncNotifyingListenableFutureTask(final Callable<V> callable) {
+        super(callable);
     }
 
-    private AsyncNotifyingListenableFutureTask( Runnable runnable, @Nullable V result,
-            @Nullable Executor listenerExecutor ) {
-        super( runnable, result );
-        this.listenerExecutor = listenerExecutor;
+    private AsyncNotifyingListenableFutureTask(final Runnable runnable, @Nullable final V result) {
+        super(runnable, result);
     }
 
     /**
@@ -77,9 +133,13 @@ public class AsyncNotifyingListenableFutureTask<V> extends FutureTask<V> impleme
      * @param listenerExecutor the executor used to run listener callbacks asynchronously.
      *                         If null, no executor is used.
      */
-    public static <V> AsyncNotifyingListenableFutureTask<V> create( Callable<V> callable,
-            @Nullable Executor listenerExecutor ) {
-      return new AsyncNotifyingListenableFutureTask<V>( callable, listenerExecutor );
+    public static <V> AsyncNotifyingListenableFutureTask<V> create(final Callable<V> callable,
+            @Nullable final Executor listenerExecutor) {
+        if (listenerExecutor != null) {
+            return new DelegatingAsyncNotifyingListenableFutureTask<V>(callable, listenerExecutor);
+        } else {
+            return new AsyncNotifyingListenableFutureTask<V>(callable);
+        }
     }
 
     /**
@@ -92,25 +152,26 @@ public class AsyncNotifyingListenableFutureTask<V> extends FutureTask<V> impleme
      * @param listenerExecutor the executor used to run listener callbacks asynchronously.
      *                         If null, no executor is used.
      */
-    public static <V> AsyncNotifyingListenableFutureTask<V> create( Runnable runnable, @Nullable V result,
-            @Nullable Executor listenerExecutor ) {
-      return new AsyncNotifyingListenableFutureTask<V>( runnable, result, listenerExecutor );
+    public static <V> AsyncNotifyingListenableFutureTask<V> create(final Runnable runnable, @Nullable final V result,
+            @Nullable final Executor listenerExecutor) {
+        if (listenerExecutor != null) {
+            return new DelegatingAsyncNotifyingListenableFutureTask<V>(runnable, result, listenerExecutor);
+        } else {
+            return new AsyncNotifyingListenableFutureTask<V>(runnable, result);
+        }
     }
 
     @Override
-    public void addListener( Runnable listener, Executor executor ) {
-        // If a listenerExecutor was specified on construction, wrap the listener Runnable in a
-        // DelegatingRunnable. If the specified executor is one that runs tasks in the same thread
-        // as the caller submitting the task (eg MoreExecutors#sameThreadExecutor) and the
-        // listener is executed from the #done method, then the DelegatingRunnable will detect this
-        // via the ThreadLocal and submit the listener Runnable to the listenerExecutor.
-        //
-        // On the other hand, if this task is already complete, the call to ExecutionList#add below
-        // will execute the listener Runnable immediately and, since the ThreadLocal won't be set,
-        // the DelegatingRunnable will run the listener Runnable inline.
-
-        executionList.add( listenerExecutor == null ? listener :
-            new DelegatingRunnable( listener, listenerExecutor ), executor );
+    public void addListener(@Nonnull final Runnable listener, final Executor executor) {
+        executionList.add(listener, executor);
+    }
+
+    /**
+     * Remove the state which may have attached to the calling thread. If no state
+     * was attached this method does nothing.
+     */
+    public static void cleanStateForCurrentThread() {
+        ON_TASK_COMPLETION_THREAD_TL.remove();
     }
 
     /**
@@ -118,37 +179,13 @@ public class AsyncNotifyingListenableFutureTask<V> extends FutureTask<V> impleme
      */
     @Override
     protected void done() {
-        ON_TASK_COMPLETION_THREAD_TL.set( Boolean.TRUE );
+        final SettableBoolean b = ON_TASK_COMPLETION_THREAD_TL.get();
+        b.set();
+
         try {
             executionList.execute();
         } finally {
-            ON_TASK_COMPLETION_THREAD_TL.remove();
-        }
-    }
-
-    private static class DelegatingRunnable implements Runnable {
-
-        private final Runnable delegate;
-        private final Executor executor;
-
-        DelegatingRunnable( Runnable delegate, Executor executor ) {
-            this.delegate = delegate;
-            this.executor = executor;
-        }
-
-        @Override
-        public void run() {
-            if( ON_TASK_COMPLETION_THREAD_TL.get() == null ) {
-                // We're not running on the task completion thread so run the delegate inline.
-                LOG.trace( "Executing ListenenableFuture Runnable on this thread: {}",
-                        Thread.currentThread().getName() );
-                delegate.run();
-            } else {
-                // We're running on the task completion thread so off-load to the executor.
-                LOG.trace( "Submitting ListenenableFuture Runnable to the listenerExecutor",
-                        Thread.currentThread().getName() );
-                executor.execute( delegate );
-            }
+            b.reset();
         }
     }
 }
index ab010c964de2a7ec32a12c7ef40458068ede6c5e..c44864350e863dc639e307d23fc343efbc38bfc1 100644 (file)
@@ -8,14 +8,14 @@
 
 package org.opendaylight.yangtools.util.concurrent;
 
+import com.google.common.base.Preconditions;
+
 import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 
 import org.opendaylight.yangtools.util.ExecutorServiceUtil;
 
-import com.google.common.base.Preconditions;
-
 /**
  * A RejectedExecutionHandler that delegates to a backing RejectedExecutionHandler and counts the
  * number of rejected tasks.
@@ -23,30 +23,31 @@ import com.google.common.base.Preconditions;
  * @author Thomas Pantelis
  */
 public class CountingRejectedExecutionHandler implements RejectedExecutionHandler {
-
+    private static final AtomicLongFieldUpdater<CountingRejectedExecutionHandler> COUNTER_UPDATER =
+            AtomicLongFieldUpdater.newUpdater(CountingRejectedExecutionHandler.class, "rejectedTaskCounter");
     private final RejectedExecutionHandler delegate;
-    private final AtomicLong rejectedTaskCounter = new AtomicLong();
+    private volatile long rejectedTaskCounter;
 
     /**
      * Constructor.
      *
      * @param delegate the backing RejectedExecutionHandler.
      */
-    public CountingRejectedExecutionHandler( RejectedExecutionHandler delegate ) {
+    public CountingRejectedExecutionHandler( final RejectedExecutionHandler delegate ) {
         this.delegate = Preconditions.checkNotNull( delegate );
     }
 
     @Override
-    public void rejectedExecution( Runnable task, ThreadPoolExecutor executor ) {
-        rejectedTaskCounter.incrementAndGet();
+    public void rejectedExecution( final Runnable task, final ThreadPoolExecutor executor ) {
+        COUNTER_UPDATER.incrementAndGet(this);
         delegate.rejectedExecution( task, executor );
     }
 
     /**
      * Returns the rejected task count.
      */
-    public long getRejectedTaskCount(){
-        return rejectedTaskCounter.get();
+    public long getRejectedTaskCount() {
+        return rejectedTaskCounter;
     }
 
     /**
index 011872d6b138d9edbaf28869524121bd859f0b43..958f2ee5118b265ccdde95cd8ed9891fdde373bb 100644 (file)
@@ -8,11 +8,12 @@
 
 package org.opendaylight.yangtools.util.concurrent;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-
 import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
 import com.google.common.util.concurrent.ForwardingListenableFuture;
 import com.google.common.util.concurrent.ListenableFuture;
+
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
@@ -20,6 +21,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 /**
@@ -42,11 +44,35 @@ import javax.annotation.Nullable;
  * from this class override the <code>get</code> methods to check if the ThreadLocal is set. If it is,
  * an ExecutionException is thrown with a custom cause.
  *
+ * Note that the ThreadLocal is not removed automatically, so some state may be left hanging off of
+ * threads which have encountered this class. If you need to clean that state up, use
+ * {@link #cleanStateForCurrentThread()}.
+ *
  * @author Thomas Pantelis
+ * @author Robert Varga
  */
 public class DeadlockDetectingListeningExecutorService extends AsyncNotifyingListeningExecutorService {
-    private final ThreadLocal<Boolean> deadlockDetector = new ThreadLocal<>();
-    private final Function<Void, Exception> deadlockExceptionFunction;
+    /*
+     * We cannot use a static field simply because our API contract allows nesting, which means some
+     * tasks may be submitted to underlay and some to overlay service -- and the two cases need to
+     * be discerned reliably.
+     */
+    private final SettableBooleanThreadLocal deadlockDetector = new SettableBooleanThreadLocal();
+    private final Supplier<Exception> deadlockExceptionFunction;
+
+    // Compatibility wrapper, needs to be removed once the deprecated constructors are gone.
+    private static final class CompatExceptionSupplier implements Supplier<Exception> {
+        private final Function<Void, Exception> function;
+
+        private CompatExceptionSupplier(final Function<Void, Exception> function) {
+            this.function = Preconditions.checkNotNull(function);
+        }
+
+        @Override
+        public Exception get() {
+            return function.apply(null);
+        }
+    }
 
     /**
      * Constructor.
@@ -54,9 +80,11 @@ public class DeadlockDetectingListeningExecutorService extends AsyncNotifyingLis
      * @param delegate the backing ExecutorService.
      * @param deadlockExceptionFunction Function that returns an Exception instance to set as the
      *             cause of the ExecutionException when a deadlock is detected.
+     * @deprecated Use {@link #DeadlockDetectingListeningExecutorService(ExecutorService, Supplier)} instead.
      */
-    public DeadlockDetectingListeningExecutorService( ExecutorService delegate,
-                                          Function<Void,Exception> deadlockExceptionFunction ) {
+    @Deprecated
+    public DeadlockDetectingListeningExecutorService(final ExecutorService delegate,
+            final Function<Void, Exception> deadlockExceptionFunction) {
         this(delegate, deadlockExceptionFunction, null);
     }
 
@@ -68,43 +96,88 @@ public class DeadlockDetectingListeningExecutorService extends AsyncNotifyingLis
      *             cause of the ExecutionException when a deadlock is detected.
      * @param listenableFutureExecutor the executor used to run listener callbacks asynchronously.
      *             If null, no executor is used.
+     * @deprecated Use {@link #DeadlockDetectingListeningExecutorService(ExecutorService, Supplier, Executor)} instead.
      */
-    public DeadlockDetectingListeningExecutorService( ExecutorService delegate,
-                                          Function<Void,Exception> deadlockExceptionFunction,
-                                          @Nullable Executor listenableFutureExecutor ) {
+    @Deprecated
+    public DeadlockDetectingListeningExecutorService(final ExecutorService delegate,
+            final Function<Void, Exception> deadlockExceptionFunction,
+            @Nullable final Executor listenableFutureExecutor) {
         super(delegate, listenableFutureExecutor);
-        this.deadlockExceptionFunction = checkNotNull(deadlockExceptionFunction);
+        this.deadlockExceptionFunction = new CompatExceptionSupplier(deadlockExceptionFunction);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param delegate the backing ExecutorService.
+     * @param deadlockExceptionSupplier Supplier that returns an Exception instance to set as the
+     *             cause of the ExecutionException when a deadlock is detected.
+     */
+    public DeadlockDetectingListeningExecutorService(final ExecutorService delegate,
+            @Nonnull final Supplier<Exception> deadlockExceptionSupplier) {
+        this(delegate, deadlockExceptionSupplier, null);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param delegate the backing ExecutorService.
+     * @param deadlockExceptionSupplier Supplier that returns an Exception instance to set as the
+     *             cause of the ExecutionException when a deadlock is detected.
+     * @param listenableFutureExecutor the executor used to run listener callbacks asynchronously.
+     *             If null, no executor is used.
+     */
+    public DeadlockDetectingListeningExecutorService(final ExecutorService delegate,
+            @Nonnull final Supplier<Exception> deadlockExceptionSupplier,
+            @Nullable final Executor listenableFutureExecutor ) {
+        super(delegate, listenableFutureExecutor);
+        this.deadlockExceptionFunction = Preconditions.checkNotNull(deadlockExceptionSupplier);
     }
 
     @Override
-    public void execute( Runnable command ){
+    public void execute(final Runnable command) {
         getDelegate().execute(wrapRunnable(command));
     }
 
     @Override
-    public <T> ListenableFuture<T> submit( Callable<T> task ){
+    public <T> ListenableFuture<T> submit(final Callable<T> task) {
         return wrapListenableFuture(super.submit(wrapCallable(task)));
     }
 
     @Override
-    public ListenableFuture<?> submit( Runnable task ){
+    public ListenableFuture<?> submit(final Runnable task) {
         return wrapListenableFuture(super.submit(wrapRunnable(task)));
     }
 
     @Override
-    public <T> ListenableFuture<T> submit( Runnable task, T result ){
+    public <T> ListenableFuture<T> submit(final Runnable task, final T result) {
         return wrapListenableFuture(super.submit(wrapRunnable(task), result));
     }
 
+    /**
+     * Remove the state this instance may have attached to the calling thread. If no state
+     * was attached this method does nothing.
+     */
+    public void cleanStateForCurrentThread() {
+        deadlockDetector.remove();
+    }
+
+    private SettableBoolean primeDetector() {
+        final SettableBoolean b = deadlockDetector.get();
+        Preconditions.checkState(!b.isSet(), "Detector for {} has already been primed", this);
+        b.set();
+        return b;
+    }
+
     private Runnable wrapRunnable(final Runnable task) {
         return new Runnable() {
             @Override
             public void run() {
-                deadlockDetector.set(Boolean.TRUE);
+                final SettableBoolean b = primeDetector();
                 try {
                     task.run();
                 } finally {
-                    deadlockDetector.remove();
+                    b.reset();
                 }
             }
         };
@@ -114,17 +187,17 @@ public class DeadlockDetectingListeningExecutorService extends AsyncNotifyingLis
         return new Callable<T>() {
             @Override
             public T call() throws Exception {
-                deadlockDetector.set(Boolean.TRUE);
+                final SettableBoolean b = primeDetector();
                 try {
                     return delagate.call();
                 } finally {
-                    deadlockDetector.remove();
+                    b.reset();
                 }
             }
         };
     }
 
-    private <T> ListenableFuture<T> wrapListenableFuture(final ListenableFuture<T> delegate ) {
+    private <T> ListenableFuture<T> wrapListenableFuture(final ListenableFuture<T> delegate) {
         /*
          * This creates a forwarding Future that overrides calls to get(...) to check, via the
          * ThreadLocal, if the caller is doing a blocking call on a thread from this executor. If
@@ -148,9 +221,9 @@ public class DeadlockDetectingListeningExecutorService extends AsyncNotifyingLis
             }
 
             void checkDeadLockDetectorTL() throws ExecutionException {
-                if (deadlockDetector.get() != null) {
+                if (deadlockDetector.get().isSet()) {
                     throw new ExecutionException("A potential deadlock was detected.",
-                            deadlockExceptionFunction.apply(null));
+                            deadlockExceptionFunction.get());
                 }
             }
         };
diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/SettableBoolean.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/SettableBoolean.java
new file mode 100644 (file)
index 0000000..0d584af
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.yangtools.util.concurrent;
+
+/**
+ * Simple container encapsulating a boolean flag, which can be toggled. It starts
+ * off in the reset state.
+ */
+final class SettableBoolean {
+    private boolean value = false;
+
+    /**
+     * Set the flag to its initial (false) state.
+     */
+    public void reset() {
+        value = false;
+    }
+
+    /**
+     * Set the flag.
+     */
+    public void set() {
+        value = true;
+    }
+
+    /**
+     * Query the flag.
+     *
+     * @return True if the flag has been set since instantiation or last {@link #reset()}.
+     */
+    public boolean isSet() {
+        return value;
+    }
+}
diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/SettableBooleanThreadLocal.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/SettableBooleanThreadLocal.java
new file mode 100644 (file)
index 0000000..8826f99
--- /dev/null
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.yangtools.util.concurrent;
+
+/**
+ * A reusable {@link ThreadLocal} which returns a {@link SettableBoolean}.
+ */
+final class SettableBooleanThreadLocal extends ThreadLocal<SettableBoolean> {
+    @Override
+    protected SettableBoolean initialValue() {
+        return new SettableBoolean();
+    }
+
+    @Override
+    public void set(final SettableBoolean value) {
+        throw new UnsupportedOperationException("Resetting the value is not supported");
+    }
+}
index 38b5d9017fd65968c8464d6dd7b999dc7f760320..853a0aae0ebeb1d08f7145af5b30a55fac2b2328 100644 (file)
@@ -8,11 +8,12 @@
 
 package org.opendaylight.yangtools.util.concurrent;
 
+import com.google.common.annotations.Beta;
+
 import java.util.Collection;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 /**
  * A {@link LinkedBlockingQueue} that tracks the largest queue size for debugging.
@@ -22,17 +23,15 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
  * @param <E> the element t.ype
  */
 public class TrackingLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {
-
+    @SuppressWarnings("rawtypes")
+    private static final AtomicIntegerFieldUpdater<TrackingLinkedBlockingQueue> LARGEST_QUEUE_SIZE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(TrackingLinkedBlockingQueue.class, "largestQueueSize");
     private static final long serialVersionUID = 1L;
 
     /**
      * Holds largestQueueSize, this long field should be only accessed
      * using {@value #LARGEST_QUEUE_SIZE_UPDATER}
      */
-    private volatile long largestQueueSize = 0;
-
-    @SuppressWarnings("rawtypes")
-    private static AtomicLongFieldUpdater<TrackingLinkedBlockingQueue> LARGEST_QUEUE_SIZE_UPDATER = AtomicLongFieldUpdater.newUpdater(TrackingLinkedBlockingQueue.class, "largestQueueSize");
+    private volatile int largestQueueSize = 0;
 
     /**
      * @see LinkedBlockingQueue#LinkedBlockingQueue
@@ -44,26 +43,29 @@ public class TrackingLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {
     /**
      * @see LinkedBlockingQueue#LinkedBlockingQueue(Collection)
      */
-    public TrackingLinkedBlockingQueue( Collection<? extends E> c ) {
+    public TrackingLinkedBlockingQueue( final Collection<? extends E> c ) {
         super(c);
     }
 
     /**
      * @see LinkedBlockingQueue#LinkedBlockingQueue(int)
      */
-    public TrackingLinkedBlockingQueue( int capacity ) {
+    public TrackingLinkedBlockingQueue( final int capacity ) {
         super(capacity);
     }
 
     /**
      * Returns the largest queue size.
+     *
+     * FIXME: the this return will be changed to int in a future release.
      */
-    public long getLargestQueueSize(){
+    @Beta
+    public long getLargestQueueSize() {
         return largestQueueSize;
     }
 
     @Override
-    public boolean offer( E e, long timeout, TimeUnit unit ) throws InterruptedException {
+    public boolean offer( final E e, final long timeout, final TimeUnit unit ) throws InterruptedException {
         if( super.offer( e, timeout, unit ) ) {
             updateLargestQueueSize();
             return true;
@@ -73,7 +75,7 @@ public class TrackingLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {
     }
 
     @Override
-    public boolean offer( E e ) {
+    public boolean offer( final E e ) {
         if( super.offer( e ) ) {
             updateLargestQueueSize();
             return true;
@@ -83,20 +85,20 @@ public class TrackingLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {
     }
 
     @Override
-    public void put( E e ) throws InterruptedException {
+    public void put( final E e ) throws InterruptedException {
         super.put( e );
         updateLargestQueueSize();
     }
 
     @Override
-    public boolean add( E e ) {
+    public boolean add( final E e ) {
         boolean result = super.add( e );
         updateLargestQueueSize();
         return result;
     }
 
     @Override
-    public boolean addAll( Collection<? extends E> c ) {
+    public boolean addAll( final Collection<? extends E> c ) {
         try {
             return super.addAll( c );
         } finally {
@@ -105,10 +107,11 @@ public class TrackingLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {
     }
 
     private void updateLargestQueueSize() {
-        long size = size();
-        long largest = largestQueueSize;
-        if( size > largest ) {
-            LARGEST_QUEUE_SIZE_UPDATER.compareAndSet(this, largest, size );
-        }
+        final int size = size();
+
+        int largest;
+        do {
+            largest = largestQueueSize;
+        } while (size > largest && !LARGEST_QUEUE_SIZE_UPDATER.compareAndSet(this, largest, size));
     }
 }
index c2997a84d74814feae68fec7fc76adc9122e66b7..ee2541d8e651888fd4062db56828e0046b3a8168 100644 (file)
@@ -13,16 +13,13 @@ import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Multimap;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Deque;
 import java.util.List;
 import java.util.Map.Entry;
-
 import javax.annotation.Nonnull;
-
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.AugmentationIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
@@ -193,7 +190,7 @@ class CompositeNodeDataWithSchema extends AbstractNodeDataWithSchema {
      * node is found then it is returned, else null.
      */
     AugmentationSchema findCorrespondingAugment(final DataSchemaNode parent, final DataSchemaNode child) {
-        if (parent instanceof AugmentationTarget) {
+        if (parent instanceof AugmentationTarget && !((parent instanceof ChoiceCaseNode) || (parent instanceof ChoiceNode))) {
             for (AugmentationSchema augmentation : ((AugmentationTarget) parent).getAvailableAugmentations()) {
                 DataSchemaNode childInAugmentation = augmentation.getDataChildByName(child.getQName());
                 if (childInAugmentation != null) {
index 058ad5b78e8cb95729dbf90025ed4b3cdcd87e45..643e9de9ad041f9b2c4c3c02bf6e3d3317061295 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.yangtools.yang.data.codec.gson;
 
+import com.google.common.base.CharMatcher;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.gson.stream.JsonWriter;
@@ -41,6 +42,11 @@ public class JSONNormalizedNodeStreamWriter implements NormalizedNodeStreamWrite
      */
     private static final boolean DEFAULT_EMIT_EMPTY_CONTAINERS = true;
 
+    /**
+     * Matcher used to check if a string needs to be escaped.
+     */
+    private static final CharMatcher QUOTES_OR_BACKSLASH = CharMatcher.anyOf("\\\"");
+
     private final SchemaTracker tracker;
     private final JSONCodecFactory codecs;
     private final Writer writer;
@@ -222,7 +228,24 @@ public class JSONNormalizedNodeStreamWriter implements NormalizedNodeStreamWrite
     private void writeValue(final String str, final boolean needQuotes) throws IOException {
         if (needQuotes) {
             writer.append('"');
-            writer.append(str);
+
+            final int needEscape = QUOTES_OR_BACKSLASH.countIn(str);
+            if (needEscape != 0) {
+                final char[] escaped = new char[str.length() + needEscape];
+                int offset = 0;
+
+                for (int i = 0; i < str.length(); i++) {
+                    final char c = str.charAt(i);
+                    if (QUOTES_OR_BACKSLASH.matches(c)) {
+                        escaped[offset++] = '\\';
+                    }
+                    escaped[offset++] = c;
+                }
+                writer.write(escaped);
+            } else {
+                writer.append(str);
+            }
+
             writer.append('"');
         } else {
             writer.append(str);
index cd43988642ea28a89e688afea014c298fba41928..01bfe32b2811d9afaf93040f6c8c112af899bdf7 100644 (file)
@@ -16,18 +16,41 @@ import org.opendaylight.yangtools.concepts.Immutable;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public abstract class AbstractImmutableDataContainerNode<K extends PathArgument> extends AbstractImmutableNormalizedNode<K, Iterable<DataContainerChild<? extends PathArgument, ?>>>
-implements Immutable, DataContainerNode<K> {
-
-    protected final Map<PathArgument, DataContainerChild<? extends PathArgument, ?>> children;
-    private final Map<PathArgument, DataContainerChild<? extends PathArgument, ?>> publicChildren;
+public abstract class AbstractImmutableDataContainerNode<K extends PathArgument> extends AbstractImmutableNormalizedNode<K, Iterable<DataContainerChild<? extends PathArgument, ?>>> implements Immutable, DataContainerNode<K> {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractImmutableDataContainerNode.class);
+    private final Map<PathArgument, DataContainerChild<? extends PathArgument, ?>> children;
 
     public AbstractImmutableDataContainerNode(
             final Map<PathArgument, DataContainerChild<? extends PathArgument, ?>> children, final K nodeIdentifier) {
         super(nodeIdentifier);
-        this.children = children;
-        this.publicChildren = Collections.unmodifiableMap(children);
+
+        /*
+         * There is a code path where AbstractImmutableDataContainerNodeBuilder can reflect
+         * the collection acquired via getChildren() back to us. This is typically the case
+         * in the datastore where transactions cancel each other out, leaving an unmodified
+         * node. In that case we want to skip wrapping the map again (and again and again).
+         *
+         * In a perfect world, Collection.unmodifiableMap() would be doing the instanceof
+         * check which would stop the proliferation. Unfortunately this not the case and the
+         * 'unmodifiable' trait is not exposed by anything we can query. Furthermore the API
+         * contract there is sufficiently vague so an implementation may actually return a
+         * different implementation based on input map -- for example
+         * Collections.unmodifiableMap(Collections.emptyMap()) returning the same thing as
+         * Collections.emptyMap().
+         *
+         * This means that we have to perform the instantiation here (as opposed to once at
+         * class load time) and then compare the classes.
+         */
+        final Map<PathArgument, DataContainerChild<? extends PathArgument, ?>> pub = Collections.unmodifiableMap(children);
+        if (children.getClass().equals(pub.getClass())) {
+            LOG.trace("Reusing already-unmodifiable children {}", children);
+            this.children = children;
+        } else {
+            this.children = pub;
+        }
     }
 
     @Override
@@ -37,7 +60,7 @@ implements Immutable, DataContainerNode<K> {
 
     @Override
     public final Iterable<DataContainerChild<? extends PathArgument, ?>> getValue() {
-        return publicChildren.values();
+        return children.values();
     }
 
     @Override
@@ -46,7 +69,7 @@ implements Immutable, DataContainerNode<K> {
     }
 
     public final Map<PathArgument, DataContainerChild<? extends PathArgument, ?>> getChildren() {
-        return publicChildren;
+        return children;
     }
 
     @Override
index 4ad092e582d0196c434625a7a2dc3493bcc3642b..a993efa6cf4437bf3fa5ee71a305ede41d95ccf3 100644 (file)
     <description>${project.artifactId}</description>
 
     <dependencies>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>object-cache-api</artifactId>
+        </dependency>
         <dependency>
             <groupId>${project.groupId}</groupId>
             <artifactId>util</artifactId>
index 4e1fd00f20750388fb4af2579bc7af8f65ef9916..44a8cceee7609533e079e4aa4c64fa318469918e 100644 (file)
@@ -13,6 +13,8 @@ import com.google.common.base.Preconditions;
 
 import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.concepts.Immutable;
+import org.opendaylight.yangtools.objcache.ObjectCache;
+import org.opendaylight.yangtools.objcache.ObjectCacheFactory;
 
 /**
  * YANG Schema source identifier
@@ -37,26 +39,24 @@ import org.opendaylight.yangtools.concepts.Immutable;
  */
 @Beta
 public final class SourceIdentifier implements Identifier, Immutable {
-
     /**
      * Default revision for sources without specified revision.
      * Marks the source as oldest.
      */
     public static final String NOT_PRESENT_FORMATTED_REVISION = "0000-00-00";
 
+    private static final ObjectCache CACHE = ObjectCacheFactory.getObjectCache(SourceIdentifier.class);
     private static final long serialVersionUID = 1L;
     private final String revision;
     private final String name;
 
     /**
-     *
      * Creates new YANG Schema source identifier.
      *
      * @param name Name of schema
      * @param formattedRevision Revision of source in format YYYY-mm-dd
      */
     public SourceIdentifier(final String name, final String formattedRevision) {
-        super();
         this.name = Preconditions.checkNotNull(name);
         this.revision = Preconditions.checkNotNull(formattedRevision);
     }
@@ -72,6 +72,15 @@ public final class SourceIdentifier implements Identifier, Immutable {
         this(name, formattedRevision.or(NOT_PRESENT_FORMATTED_REVISION));
     }
 
+    /**
+     * Return a cached reference to an object equal to this object.
+     *
+     * @return A potentially shared reference, not guaranteed to be unique.
+     */
+    public SourceIdentifier cachedReference() {
+        return CACHE.getReference(this);
+    }
+
     /**
      *
      * Creates new YANG Schema source identifier for sources without revision.
@@ -83,7 +92,6 @@ public final class SourceIdentifier implements Identifier, Immutable {
         this(name, NOT_PRESENT_FORMATTED_REVISION);
     }
 
-
     /**
      * Returns model name
      *
index fae44679e2a750b413980003dddaa93b94d5cc25..fa2758f4e3ab5679ef47dca107170a3ab761032d 100644 (file)
@@ -10,6 +10,8 @@ package org.opendaylight.yangtools.yang.model.repo.spi;
 import com.google.common.annotations.Beta;
 import com.google.common.base.Preconditions;
 
+import org.opendaylight.yangtools.objcache.ObjectCache;
+import org.opendaylight.yangtools.objcache.ObjectCacheFactory;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceRepresentation;
 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
 
@@ -62,6 +64,7 @@ public final class PotentialSchemaSource<T extends SchemaSourceRepresentation> {
         }
     }
 
+    private static final ObjectCache CACHE = ObjectCacheFactory.getObjectCache(PotentialSchemaSource.class);
     private final Class<? extends T> representation;
     private final SourceIdentifier sourceIdentifier;
     private final int cost;
@@ -77,6 +80,15 @@ public final class PotentialSchemaSource<T extends SchemaSourceRepresentation> {
         return new PotentialSchemaSource<>(sourceIdentifier, representation, cost);
     }
 
+    /**
+     * Return a cached reference to an object equal to this object.
+     *
+     * @return A potentially shared reference, not guaranteed to be unique.
+     */
+    public PotentialSchemaSource<T> cachedReference() {
+        return CACHE.getReference(this);
+    }
+
     public SourceIdentifier getSourceIdentifier() {
         return sourceIdentifier;
     }
index 722ac918e08c995d391546d085fcf6b4aa36b95b..d6058292afb5ad4646d0fc6e31ca96673e207195 100644 (file)
@@ -153,14 +153,16 @@ public abstract class AbstractSchemaRepository implements SchemaRepository, Sche
 
     @Override
     public <T extends SchemaSourceRepresentation> SchemaSourceRegistration<T> registerSchemaSource(final SchemaSourceProvider<? super T> provider, final PotentialSchemaSource<T> source) {
-        final AbstractSchemaSourceRegistration<T> ret = new AbstractSchemaSourceRegistration<T>(provider, source) {
+        final PotentialSchemaSource<T> src = source.cachedReference();
+
+        final AbstractSchemaSourceRegistration<T> ret = new AbstractSchemaSourceRegistration<T>(provider, src) {
             @Override
             protected void removeRegistration() {
-                removeSource(source, this);
+                removeSource(src, this);
             }
         };
 
-        addSource(source, ret);
+        addSource(src, ret);
         return ret;
     }
 
index 3281f15acf9ef82fc639544aa0c691221ad7b955..3c8a45e26dca59f7f100b8f7fbc4f4289af54e73 100644 (file)
@@ -134,10 +134,10 @@ final class SharedSchemaContextFactory implements SchemaContextFactory {
 
             @Override
             public void onFailure(final Throwable t) {
-                LOG.info("Failed to assemble sources", t);
+                LOG.debug("Failed to assemble sources", t);
             }
         });
 
         return Futures.makeChecked(cf, MAPPER);
     }
-}
\ No newline at end of file
+}