public class SyncReactorGuardDecorator implements SyncReactor {
private static final Logger LOG = LoggerFactory.getLogger(SyncReactorGuardDecorator.class);
-
private final SyncReactor delegate;
private final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper;
- public SyncReactorGuardDecorator(SyncReactor delegate,
- SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper) {
+ public SyncReactorGuardDecorator(final SyncReactor delegate,
+ final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper) {
this.delegate = delegate;
this.semaphoreKeeper = semaphoreKeeper;
}
public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
final SyncupEntry syncupEntry) throws InterruptedException {
final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
- LOG.trace("syncup guard decorator: {}", nodeId.getValue());
-
final long stampBeforeGuard = System.nanoTime();
final Semaphore guard = summonGuardAndAcquire(flowcapableNodePath);
if (guard == null) {
- return Futures.immediateFuture(false);
+ return Futures.immediateFuture(Boolean.FALSE);
}
final long stampAfterGuard = System.nanoTime();
try {
if (LOG.isDebugEnabled()) {
- LOG.debug("syncup start {} waiting:{} guard:{} thread:{}", nodeId.getValue(),
- formatNanos(stampAfterGuard - stampBeforeGuard),
- guard, threadName());
+ LOG.debug("Syncup guard acquired and running for {} ", nodeId.getValue());
}
-
- final ListenableFuture<Boolean> endResult =
- delegate.syncup(flowcapableNodePath, syncupEntry);
-
+ final ListenableFuture<Boolean> endResult = delegate.syncup(flowcapableNodePath, syncupEntry);
Futures.addCallback(endResult, createSyncupCallback(guard, stampBeforeGuard, stampAfterGuard, nodeId));
return endResult;
} catch (InterruptedException e) {
- releaseGuardForNodeId(guard);
+ releaseGuard(guard);
throw e;
}
}
public void onSuccess(@Nullable final Boolean result) {
if (LOG.isDebugEnabled()) {
final long stampFinished = System.nanoTime();
- LOG.debug("syncup finished {} took:{} rpc:{} wait:{} guard:{} permits thread:{}", nodeId.getValue(),
+ LOG.debug("Syncup finished {} took:{} rpc:{} wait:{}", nodeId.getValue(),
formatNanos(stampFinished - stampBeforeGuard), formatNanos(stampFinished - stampAfterGuard),
- formatNanos(stampAfterGuard - stampBeforeGuard), guard.availablePermits(), threadName());
+ formatNanos(stampAfterGuard - stampBeforeGuard));
}
- releaseGuardForNodeId(guard);
+ releaseGuard(guard);
}
@Override
public void onFailure(final Throwable t) {
final long stampFinished = System.nanoTime();
- LOG.error("syncup failed {} took:{} rpc:{} wait:{} guard:{} permits thread:{}", nodeId.getValue(),
+ LOG.warn("Syncup failed {} took:{} rpc:{} wait:{}", nodeId.getValue(),
formatNanos(stampFinished - stampBeforeGuard), formatNanos(stampFinished - stampAfterGuard),
- formatNanos(stampAfterGuard - stampBeforeGuard), guard.availablePermits(), threadName());
- releaseGuardForNodeId(guard);
+ formatNanos(stampAfterGuard - stampBeforeGuard));
+ releaseGuard(guard);
}};
}
- private static String formatNanos(long nanos) {
+ private static String formatNanos(final long nanos) {
return "'" + TimeUnit.NANOSECONDS.toMillis(nanos) + " ms'";
}
private Semaphore summonGuardAndAcquire(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath) {
final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
final Semaphore guard = Preconditions.checkNotNull(semaphoreKeeper.summonGuard(flowcapableNodePath),
- "no guard for " + nodeId.getValue());
+ "No guard for " + nodeId.getValue());
try {
guard.acquire();
} catch (InterruptedException e) {
- LOG.error("syncup summon {} failed {}", nodeId.getValue(), e);
+ LOG.warn("Syncup summon {} failed {}", nodeId.getValue(), e);
return null;
}
- LOG.trace("syncup summon {} guard:{} thread:{}", nodeId.getValue(), guard, threadName());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Syncup summon {} guard:{}", nodeId.getValue(), guard);
+ }
return guard;
}
* Unlock and release guard.
* @param guard semaphore guard which should be unlocked
*/
- private static void releaseGuardForNodeId(final Semaphore guard) {
+ private static void releaseGuard(final Semaphore guard) {
if (guard != null) {
guard.release();
- LOG.trace("syncup release guard:{} thread:{}", guard, threadName());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Syncup release guard:{} thread:{}", guard);
+ }
}
}
-
- private static String threadName() {
- final Thread currentThread = Thread.currentThread();
- return currentThread.getName();
- }
-
}