--- /dev/null
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync;
+
+import java.util.concurrent.Semaphore;
+import javax.annotation.Nonnull;
+
+/**
+ * Proposal for how a key based semaphore provider should look like.
+ * <ul>
+ * <li>thread safe</li>
+ * <li>garbage-collect unused semaphores</li>
+ * <li>for the same key there must be always only one semaphore available</li>
+ * </ul>
+ *
+ *
+ * usage:
+ * <pre>
+ * final Semaphore guard = semaphoreKeeper.summonGuard(key);
+ * guard.acquire();
+ * // guard protected logic ...
+ * guard.release();
+ * </pre>
+ *
+ * @param <K> key type
+ */
+
+public interface SemaphoreKeeper<K> {
+ /**
+ * @param key semaphore identifier
+ * @return new or existing semaphore for given key, for one key there is always only one semaphore available
+ */
+ Semaphore summonGuard(@Nonnull K key);
+}
package org.opendaylight.openflowplugin.applications.frsync.impl;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.lang.Thread.UncaughtExceptionHandler;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
+import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeCachedDao;
import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeDao;
import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeOdlDao;
import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao;
+import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
broker.registerProvider(this);
}
+ private final ListeningExecutorService syncThreadPool = FrmExecutors.instance()
+ // TODO improve log in ThreadPoolExecutor.afterExecute
+ // TODO max bloking queue size
+ // TODO core/min pool size
+ .newFixedThreadPool(6, new ThreadFactoryBuilder()
+ .setNameFormat(SyncReactorFutureDecorator.FRM_RPC_CLIENT_PREFIX + "%d")
+ .setDaemon(false)
+ .setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread thread, Throwable e) {
+ LOG.error("uncaught exception {}", thread, e);
+ }
+ })
+ .build());
+
@Override
public void onSessionInitiated(final BindingAwareBroker.ProviderContext providerContext) {
final FlowForwarder flowForwarder = new FlowForwarder(salFlowService);
{
final SyncReactorImpl syncReactorImpl = new SyncReactorImpl();
-
+ final SyncReactor syncReactorGuard = new SyncReactorGuardDecorator(syncReactorImpl
+ .setFlowForwarder(flowForwarder)
+ .setGroupForwarder(groupForwarder)
+ .setMeterForwarder(meterForwarder)
+ .setTableForwarder(tableForwarder)
+ .setTransactionService(transactionService),
+ new SemaphoreKeeperGuavaImpl<InstanceIdentifier<FlowCapableNode>>(1, true));
+
+ final SyncReactor cfgReactor = new SyncReactorFutureWithCompressionDecorator(syncReactorGuard, syncThreadPool);
+ final SyncReactor operReactor = new SyncReactorFutureWithCompressionDecorator(syncReactorGuard, syncThreadPool);
+
final FlowCapableNodeSnapshotDao configSnapshot = new FlowCapableNodeSnapshotDao();
final FlowCapableNodeSnapshotDao operationalSnapshot = new FlowCapableNodeSnapshotDao();
final FlowCapableNodeDao configDao = new FlowCapableNodeCachedDao(configSnapshot,
--- /dev/null
+/**\r
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.\r
+ *\r
+ * This program and the accompanying materials are made available under the\r
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
+ * and is available at http://www.eclipse.org/legal/epl-v10.html\r
+ */\r
+\r
+package org.opendaylight.openflowplugin.applications.frsync.impl;\r
+\r
+import java.util.concurrent.ExecutorService;\r
+import java.util.concurrent.Executors;\r
+import java.util.concurrent.ThreadFactory;\r
+\r
+import com.google.common.annotations.VisibleForTesting;\r
+import com.google.common.util.concurrent.ListeningExecutorService;\r
+import com.google.common.util.concurrent.MoreExecutors;\r
+\r
+/**\r
+ * Static Factory for creating ExecutorServicess (because there is no dependency injection but\r
+ * static getInstance).\r
+ */\r
+public final class FrmExecutors {\r
+ public static PceExecursFactory instance() {\r
+ return DEFAULT_EXECUTORS;\r
+ }\r
+\r
+ public interface PceExecursFactory {\r
+\r
+ public ListeningExecutorService newFixedThreadPool(int nThreads, ThreadFactory factory);\r
+ }\r
+\r
+ /**\r
+ * This will be rewritten in JUnits using SynchronousExecutorService\r
+ */\r
+ @VisibleForTesting // should not be private and final\r
+ static PceExecursFactory DEFAULT_EXECUTORS = new PceExecursFactory() {\r
+\r
+ public ListeningExecutorService newFixedThreadPool(int nThreads, ThreadFactory factory) {\r
+ final ExecutorService executorService = Executors.newFixedThreadPool(nThreads, factory);\r
+ return MoreExecutors.listeningDecorator(executorService);\r
+ }\r
+ };\r
+}\r
--- /dev/null
+/**\r
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.\r
+ *\r
+ * This program and the accompanying materials are made available under the\r
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
+ * and is available at http://www.eclipse.org/legal/epl-v10.html\r
+ */\r
+\r
+package org.opendaylight.openflowplugin.applications.frsync.impl;\r
+\r
+import java.util.concurrent.Callable;\r
+import java.util.concurrent.TimeUnit;\r
+import java.util.concurrent.TimeoutException;\r
+\r
+import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;\r
+import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;\r
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;\r
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;\r
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+import com.google.common.util.concurrent.ListenableFuture;\r
+import com.google.common.util.concurrent.ListeningExecutorService;\r
+\r
+/**\r
+ * Decorator for running delegate syncup in Future.\r
+ */\r
+public class SyncReactorFutureDecorator implements SyncReactor {\r
+\r
+ private static final Logger LOG = LoggerFactory.getLogger(SyncReactorFutureDecorator.class);\r
+\r
+ private final SyncReactor delegate;\r
+ private final ListeningExecutorService executorService;\r
+\r
+ public static final String FRM_RPC_CLIENT_PREFIX = "FRM-RPC-client-";\r
+\r
+ public SyncReactorFutureDecorator(SyncReactor delegate, ListeningExecutorService executorService) {\r
+ this.delegate = delegate;\r
+ this.executorService = executorService;\r
+ }\r
+\r
+ public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,\r
+ final FlowCapableNode configTree, final FlowCapableNode operationalTree) throws InterruptedException {\r
+ final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
+ LOG.trace("syncup {}", nodeId.getValue());\r
+\r
+ final ListenableFuture<Boolean> syncup = executorService.submit(new Callable<Boolean>() {\r
+ public Boolean call() throws Exception {\r
+ final String oldThreadName = updateThreadName(nodeId);\r
+\r
+ try {\r
+ final Boolean ret = doSyncupInFuture(flowcapableNodePath, configTree, operationalTree)\r
+ .get(10000, TimeUnit.MILLISECONDS);\r
+ LOG.trace("ret {} {}", nodeId.getValue(), ret);\r
+ return true;\r
+ } catch (TimeoutException e) {\r
+ LOG.error("doSyncupInFuture timeout occured {}", nodeId.getValue(), e);\r
+ return false;\r
+ } finally {\r
+ updateThreadName(oldThreadName);\r
+ }\r
+ }\r
+ });\r
+ \r
+ return syncup;\r
+ }\r
+\r
+ protected ListenableFuture<Boolean> doSyncupInFuture(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,\r
+ final FlowCapableNode configTree, final FlowCapableNode operationalTree)\r
+ throws InterruptedException {\r
+ final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
+ LOG.trace("doSyncupInFuture {}", nodeId.getValue());\r
+\r
+ return delegate.syncup(flowcapableNodePath, configTree, operationalTree);\r
+ }\r
+\r
+ static String threadName() {\r
+ final Thread currentThread = Thread.currentThread();\r
+ return currentThread.getName();\r
+ }\r
+\r
+ protected String updateThreadName(NodeId nodeId) {\r
+ final Thread currentThread = Thread.currentThread();\r
+ final String oldName = currentThread.getName();\r
+ try {\r
+ if (oldName.startsWith(SyncReactorFutureDecorator.FRM_RPC_CLIENT_PREFIX)) {\r
+ currentThread.setName(oldName + "@" + nodeId.getValue());\r
+ } else {\r
+ LOG.warn("try to update foreign thread name {} {}", nodeId, oldName);\r
+ }\r
+ } catch (Exception e) {\r
+ LOG.error("failed updating threadName {}", nodeId, e);\r
+ }\r
+ return oldName;\r
+ }\r
+\r
+ protected String updateThreadName(String name) {\r
+ final Thread currentThread = Thread.currentThread();\r
+ final String oldName = currentThread.getName();\r
+ try {\r
+ if (oldName.startsWith(SyncReactorFutureDecorator.FRM_RPC_CLIENT_PREFIX)) {\r
+ currentThread.setName(name);\r
+ } else {\r
+ LOG.warn("try to update foreign thread name {} {}", oldName, name);\r
+ }\r
+ } catch (Exception e) {\r
+ LOG.error("failed updating threadName {}", name, e);\r
+ }\r
+ return oldName;\r
+ }\r
+}\r
--- /dev/null
+/**\r
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.\r
+ *\r
+ * This program and the accompanying materials are made available under the\r
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
+ * and is available at http://www.eclipse.org/legal/epl-v10.html\r
+ */\r
+\r
+package org.opendaylight.openflowplugin.applications.frsync.impl;\r
+\r
+import java.util.HashMap;\r
+import java.util.Map;\r
+import java.util.concurrent.Semaphore;\r
+\r
+import javax.annotation.concurrent.GuardedBy;\r
+\r
+import org.apache.commons.lang3.tuple.Pair;\r
+import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;\r
+import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;\r
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;\r
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;\r
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+import com.google.common.util.concurrent.Futures;\r
+import com.google.common.util.concurrent.ListenableFuture;\r
+import com.google.common.util.concurrent.ListeningExecutorService;\r
+\r
+/**\r
+ * Enriches {@link SyncReactorFutureDecorator} with state compression.\r
+ */\r
+public class SyncReactorFutureWithCompressionDecorator extends SyncReactorFutureDecorator {\r
+\r
+ private static final Logger LOG = LoggerFactory.getLogger(SyncReactorFutureWithCompressionDecorator.class);\r
+\r
+ @GuardedBy("beforeCompressionGuard")\r
+ final Map<InstanceIdentifier<FlowCapableNode>, Pair<FlowCapableNode, FlowCapableNode>> beforeCompression =\r
+ new HashMap<>();\r
+ final Semaphore beforeCompressionGuard = new Semaphore(1, false);\r
+\r
+ public SyncReactorFutureWithCompressionDecorator(SyncReactor delegate, ListeningExecutorService executorService) {\r
+ super(delegate, executorService);\r
+ }\r
+\r
+ public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,\r
+ final FlowCapableNode configTree, final FlowCapableNode operationalTree) throws InterruptedException {\r
+ final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
+ LOG.trace("syncup {}", nodeId.getValue());\r
+\r
+ try {\r
+ beforeCompressionGuard.acquire();\r
+\r
+ final boolean newFutureNecessary = updateCompressionState(flowcapableNodePath, configTree, operationalTree);\r
+ if (newFutureNecessary) {\r
+ super.syncup(flowcapableNodePath, configTree, operationalTree);\r
+ }\r
+ return Futures.immediateFuture(true);\r
+ } finally {\r
+ beforeCompressionGuard.release();\r
+ }\r
+ }\r
+\r
+ protected ListenableFuture<Boolean> doSyncupInFuture(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,\r
+ final FlowCapableNode configTree, final FlowCapableNode operationalTree)\r
+ throws InterruptedException {\r
+ final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
+ LOG.trace("doSyncupInFuture {}", nodeId.getValue());\r
+\r
+ final Pair<FlowCapableNode, FlowCapableNode> lastCompressionState =\r
+ removeLastCompressionState(flowcapableNodePath);\r
+ if (lastCompressionState == null) {\r
+ return Futures.immediateFuture(true);\r
+ } else {\r
+ return super.doSyncupInFuture(flowcapableNodePath,\r
+ lastCompressionState.getLeft(), lastCompressionState.getRight());\r
+ }\r
+ }\r
+\r
+ protected boolean updateCompressionState(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,\r
+ final FlowCapableNode configTree, final FlowCapableNode operationalTree) {\r
+ final Pair<FlowCapableNode, FlowCapableNode> previous = beforeCompression.get(flowcapableNodePath);\r
+ if (previous != null) {\r
+ final FlowCapableNode previousOperational = previous.getRight();\r
+ beforeCompression.put(flowcapableNodePath, Pair.of(configTree, previousOperational));\r
+ return false;\r
+ } else {\r
+ beforeCompression.put(flowcapableNodePath, Pair.of(configTree, operationalTree));\r
+ return true;\r
+ }\r
+ }\r
+\r
+ protected Pair<FlowCapableNode/* config */, FlowCapableNode/* operational */> removeLastCompressionState(\r
+ final InstanceIdentifier<FlowCapableNode> flowcapableNodePath) {\r
+ try {\r
+ try {\r
+ beforeCompressionGuard.acquire();\r
+ } catch (InterruptedException e) {\r
+ return null;\r
+ }\r
+\r
+ return beforeCompression.remove(flowcapableNodePath);\r
+ } finally {\r
+ beforeCompressionGuard.release();\r
+ }\r
+ }\r
+}\r
--- /dev/null
+/**\r
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.\r
+ *\r
+ * This program and the accompanying materials are made available under the\r
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
+ * and is available at http://www.eclipse.org/legal/epl-v10.html\r
+ */\r
+\r
+package org.opendaylight.openflowplugin.applications.frsync.impl;\r
+\r
+import java.util.concurrent.Semaphore;\r
+import java.util.concurrent.TimeUnit;\r
+\r
+import javax.annotation.Nullable;\r
+\r
+import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;\r
+import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;\r
+import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;\r
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;\r
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;\r
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+import com.google.common.base.Preconditions;\r
+import com.google.common.util.concurrent.FutureCallback;\r
+import com.google.common.util.concurrent.Futures;\r
+import com.google.common.util.concurrent.ListenableFuture;\r
+\r
+/**\r
+ * Decorator for NodeId level syncup locking.\r
+ */\r
+public class SyncReactorGuardDecorator implements SyncReactor {\r
+\r
+ private static final Logger LOG = LoggerFactory.getLogger(SyncReactorGuardDecorator.class);\r
+\r
+ private final SyncReactor delegate;\r
+ private final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper;\r
+\r
+ public SyncReactorGuardDecorator(SyncReactor delegate,\r
+ SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper) {\r
+ this.delegate = delegate;\r
+ this.semaphoreKeeper = semaphoreKeeper;\r
+ }\r
+\r
+ public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,\r
+ final FlowCapableNode configTree, final FlowCapableNode operationalTree) throws InterruptedException {\r
+ final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
+ LOG.trace("syncup {}", nodeId.getValue());\r
+\r
+ final long stampBeforeGuard = System.nanoTime();\r
+ final Semaphore guard = summonGuardAndAcquire(flowcapableNodePath);//TODO handle InteruptedException\r
+\r
+ try {\r
+ final long stampAfterGuard = System.nanoTime();\r
+ if (LOG.isDebugEnabled()) {\r
+ LOG.debug("syncup start {} waiting:{} guard:{} thread:{}", nodeId.getValue(),\r
+ formatNanos(stampAfterGuard - stampBeforeGuard),\r
+ guard, threadName());\r
+ }\r
+ \r
+ final ListenableFuture<Boolean> endResult =\r
+ delegate.syncup(flowcapableNodePath, configTree, operationalTree);//TODO handle InteruptedException\r
+ \r
+ Futures.addCallback(endResult, new FutureCallback<Boolean>() {\r
+ @Override\r
+ public void onSuccess(@Nullable final Boolean result) {\r
+ if (LOG.isDebugEnabled()) {\r
+ final long stampFinished = System.nanoTime();\r
+ LOG.debug("syncup finished {} took:{} rpc:{} wait:{} guard:{}, thread:{}", nodeId.getValue(),\r
+ formatNanos(stampFinished - stampBeforeGuard),\r
+ formatNanos(stampFinished - stampAfterGuard),\r
+ formatNanos(stampAfterGuard - stampBeforeGuard),\r
+ guard, threadName());\r
+ }\r
+ \r
+ lockReleaseForNodeId(nodeId, guard);\r
+ }\r
+ \r
+ @Override\r
+ public void onFailure(final Throwable t) {\r
+ if (LOG.isDebugEnabled()) {\r
+ final long stampFinished = System.nanoTime();\r
+ LOG.warn("syncup failed {} took:{} rpc:{} wait:{} guard:{} thread:{}", nodeId.getValue(),\r
+ formatNanos(stampFinished - stampBeforeGuard),\r
+ formatNanos(stampFinished - stampAfterGuard),\r
+ formatNanos(stampAfterGuard - stampBeforeGuard),\r
+ guard, threadName());\r
+ }\r
+ \r
+ lockReleaseForNodeId(nodeId, guard);\r
+ }\r
+ });\r
+ return endResult;\r
+ } catch(InterruptedException e) {\r
+ lockReleaseForNodeId(nodeId, guard);\r
+ throw e;\r
+ }\r
+ }\r
+\r
+ protected String formatNanos(long nanos) {\r
+ return "'" + TimeUnit.NANOSECONDS.toMillis(nanos) + " ms'";\r
+ }\r
+\r
+ /**\r
+ * get guard\r
+ *\r
+ * @param flowcapableNodePath\r
+ * @return\r
+ */\r
+ protected Semaphore summonGuardAndAcquire(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath)\r
+ throws InterruptedException {\r
+ final Semaphore guard = Preconditions.checkNotNull(semaphoreKeeper.summonGuard(flowcapableNodePath),\r
+ "no guard for " + flowcapableNodePath);\r
+\r
+ if (LOG.isDebugEnabled()) {\r
+ final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
+ try {\r
+ LOG.debug("syncup summon {} guard:{} thread:{}", nodeId.getValue(), guard, threadName());\r
+ } catch (Exception e) {\r
+ LOG.error("error logging guard after summon before aquiring {}", nodeId);\r
+ }\r
+ }\r
+\r
+ guard.acquire();\r
+ return guard;\r
+ }\r
+\r
+ /**\r
+ * unlock per node\r
+ *\r
+ * @param nodeId\r
+ * @param guard\r
+ */\r
+ protected void lockReleaseForNodeId(final NodeId nodeId,\r
+ final Semaphore guard) {\r
+ if (guard == null) {\r
+ return;\r
+ }\r
+ guard.release();\r
+ }\r
+\r
+ static String threadName() {\r
+ final Thread currentThread = Thread.currentThread();\r
+ return currentThread.getName();\r
+ }\r
+\r
+}\r
--- /dev/null
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.util;
+
+import java.util.concurrent.Semaphore;
+
+import javax.annotation.Nonnull;
+
+import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
+/**
+ * Key-based semaphore provider.
+ */
+public class SemaphoreKeeperGuavaImpl<K> implements SemaphoreKeeper<K> {
+
+ private LoadingCache<K, Semaphore> semaphoreCache;
+
+ public SemaphoreKeeperGuavaImpl(final int permits, final boolean fair) {
+ semaphoreCache = CacheBuilder.newBuilder()
+ .concurrencyLevel(1)
+ .weakValues()
+ .build(new CacheLoader<K, Semaphore>() {
+ @Override
+ public Semaphore load(final K key) throws Exception {
+ return new Semaphore(permits, fair) {
+ private static final long serialVersionUID = 1L;
+ };
+ }
+ });
+ }
+
+ @Override
+ public Semaphore summonGuard(final @Nonnull K key) {
+ return semaphoreCache.getUnchecked(key);
+ }
+
+ @Override
+ public String toString() {
+ return super.toString() + " size:" + (semaphoreCache == null ? null : semaphoreCache.size()) + " " + semaphoreCache;
+ }
+}