* Abstract Listener for node changes.
*/
public abstract class AbstractFrmSyncListener<T extends DataObject> implements NodeListener<T> {
+
private static final Logger LOG = LoggerFactory.getLogger(AbstractFrmSyncListener.class);
@Override
final ListenableFuture<Boolean> future = optFuture.get();
final Boolean ret = future.get(15000, TimeUnit.MILLISECONDS);
if (LOG.isTraceEnabled()) {
- LOG.trace("syncup return [{}] for {} from {} listener", ret, nodeId.getValue(), dsType());
+ LOG.trace("Syncup return [{}] for {} from {} listener", ret, nodeId.getValue(), dsType());
}
}
} catch (InterruptedException e) {
* Listens to config changes and delegates sync entry to {@link SyncReactor}.
*/
public class SimplifiedConfigListener extends AbstractFrmSyncListener<FlowCapableNode> {
+
private static final Logger LOG = LoggerFactory.getLogger(SimplifiedConfigListener.class);
private final SyncReactor reactor;
private final FlowCapableNodeSnapshotDao configSnapshot;
* Listens to operational changes and starts reconciliation through {@link SyncReactor} when necessary.
*/
public class SimplifiedOperationalListener extends AbstractFrmSyncListener<Node> {
+
private static final Logger LOG = LoggerFactory.getLogger(SimplifiedOperationalListener.class);
public static final String DATE_AND_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
-
private final SyncReactor reactor;
private final FlowCapableNodeSnapshotDao operationalSnapshot;
private final FlowCapableNodeDao configDao;
* Decorator for cluster related issues.
*/
public class SyncReactorClusterDecorator implements SyncReactor {
+
private static final Logger LOG = LoggerFactory.getLogger(SyncReactorClusterDecorator.class);
private final SyncReactor delegate;
private final DeviceMastershipManager deviceMastershipManager;
public class SyncReactorFutureDecorator implements SyncReactor {
private static final Logger LOG = LoggerFactory.getLogger(SyncReactorFutureDecorator.class);
- public static final String FRM_RPC_CLIENT_PREFIX = "FRM-RPC-client-";
+ public static final String FRM_RPC_CLIENT_PREFIX = "FRS-executor-";
private final SyncReactor delegate;
private final ListeningExecutorService executorService;
final SyncupEntry syncupEntry) throws InterruptedException {
final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
return executorService.submit(() -> {
- final String oldThreadName = updateThreadName(nodeId);
try {
- final Boolean ret = doSyncupInFuture(flowcapableNodePath, syncupEntry).get(10000, TimeUnit.MILLISECONDS);
- return ret;
+ final Boolean futureResult = doSyncupInFuture(flowcapableNodePath, syncupEntry)
+ .get(10000, TimeUnit.MILLISECONDS);
+ return futureResult;
} catch (TimeoutException e) {
- LOG.warn("doSyncupInFuture timeout occured {}", nodeId.getValue(), e);
- return false;
- } finally {
- updateThreadName(oldThreadName);
+ LOG.warn("Syncup future timeout occured {}", nodeId.getValue(), e);
+ return Boolean.FALSE;
}
});
}
final SyncupEntry syncupEntry) throws InterruptedException {
return delegate.syncup(flowcapableNodePath, syncupEntry);
}
-
- private String updateThreadName(final NodeId nodeId) {
- final Thread currentThread = Thread.currentThread();
- final String oldName = currentThread.getName();
- if (oldName.startsWith(SyncReactorFutureDecorator.FRM_RPC_CLIENT_PREFIX)) {
- currentThread.setName(oldName + "@" + nodeId.getValue());
- } else {
- LOG.warn("Try to update foreign thread name {} {}", nodeId, oldName);
- }
- return oldName;
- }
-
- private void updateThreadName(final String name) {
- final Thread currentThread = Thread.currentThread();
- final String oldName = currentThread.getName();
- if (oldName.startsWith(SyncReactorFutureDecorator.FRM_RPC_CLIENT_PREFIX)) {
- currentThread.setName(name);
- } else {
- LOG.warn("Try to update foreign thread name {} {}", oldName, name);
- }
- }
}
*/
public class SyncReactorFutureZipDecorator extends SyncReactorFutureDecorator {
- private static final Logger LOG = LoggerFactory.getLogger(SyncReactorFutureZipDecorator.class);
-
@GuardedBy("compressionGuard")
private final Map<InstanceIdentifier<FlowCapableNode>, SyncupEntry> compressionQueue = new HashMap<>();
private final Semaphore compressionGuard = new Semaphore(1, false);
if (newTaskNecessary) {
super.syncup(flowcapableNodePath, syncupEntry);
}
- return Futures.immediateFuture(true);
+ return Futures.immediateFuture(Boolean.TRUE);
} finally {
compressionGuard.release();
}
final SyncupEntry syncupEntry) throws InterruptedException {
final SyncupEntry lastCompressionState = removeLastCompressionState(flowcapableNodePath);
if (lastCompressionState == null) {
- return Futures.immediateFuture(true);
+ return Futures.immediateFuture(Boolean.TRUE);
} else {
return super.doSyncupInFuture(flowcapableNodePath, lastCompressionState);
}
public class SyncReactorGuardDecorator implements SyncReactor {
private static final Logger LOG = LoggerFactory.getLogger(SyncReactorGuardDecorator.class);
-
private final SyncReactor delegate;
private final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper;
final long stampBeforeGuard = System.nanoTime();
final Semaphore guard = summonGuardAndAcquire(flowcapableNodePath);
if (guard == null) {
- return Futures.immediateFuture(false);
+ return Futures.immediateFuture(Boolean.FALSE);
}
final long stampAfterGuard = System.nanoTime();
try {
if (LOG.isDebugEnabled()) {
- LOG.debug("syncup start {} waiting:{} guard:{} thread:{}",
- nodeId.getValue(),
- formatNanos(stampAfterGuard - stampBeforeGuard),
- guard,
- threadName());
+ LOG.debug("Syncup guard acquired and running for {} ", nodeId.getValue());
}
-
final ListenableFuture<Boolean> endResult = delegate.syncup(flowcapableNodePath, syncupEntry);
Futures.addCallback(endResult, createSyncupCallback(guard, stampBeforeGuard, stampAfterGuard, nodeId));
return endResult;
} catch (InterruptedException e) {
- releaseGuardForNodeId(guard);
+ releaseGuard(guard);
throw e;
}
}
public void onSuccess(@Nullable final Boolean result) {
if (LOG.isDebugEnabled()) {
final long stampFinished = System.nanoTime();
- LOG.debug("syncup finished {} took:{} rpc:{} wait:{} guard:{} permits thread:{}", nodeId.getValue(),
+ LOG.debug("Syncup finished {} took:{} rpc:{} wait:{}", nodeId.getValue(),
formatNanos(stampFinished - stampBeforeGuard), formatNanos(stampFinished - stampAfterGuard),
- formatNanos(stampAfterGuard - stampBeforeGuard), guard.availablePermits(), threadName());
+ formatNanos(stampAfterGuard - stampBeforeGuard));
}
- releaseGuardForNodeId(guard);
+ releaseGuard(guard);
}
@Override
public void onFailure(final Throwable t) {
final long stampFinished = System.nanoTime();
- LOG.warn("syncup failed {} took:{} rpc:{} wait:{} guard:{} permits thread:{}", nodeId.getValue(),
+ LOG.warn("Syncup failed {} took:{} rpc:{} wait:{}", nodeId.getValue(),
formatNanos(stampFinished - stampBeforeGuard), formatNanos(stampFinished - stampAfterGuard),
- formatNanos(stampAfterGuard - stampBeforeGuard), guard.availablePermits(), threadName());
- releaseGuardForNodeId(guard);
+ formatNanos(stampAfterGuard - stampBeforeGuard));
+ releaseGuard(guard);
}};
}
private Semaphore summonGuardAndAcquire(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath) {
final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
final Semaphore guard = Preconditions.checkNotNull(semaphoreKeeper.summonGuard(flowcapableNodePath),
- "no guard for " + nodeId.getValue());
+ "No guard for " + nodeId.getValue());
try {
guard.acquire();
} catch (InterruptedException e) {
- LOG.warn("syncup summon {} failed {}", nodeId.getValue(), e);
+ LOG.warn("Syncup summon {} failed {}", nodeId.getValue(), e);
return null;
}
if (LOG.isTraceEnabled()) {
- LOG.trace("syncup summon {} guard:{} thread:{}", nodeId.getValue(), guard, threadName());
+ LOG.trace("Syncup summon {} guard:{}", nodeId.getValue(), guard);
}
return guard;
}
* Unlock and release guard.
* @param guard semaphore guard which should be unlocked
*/
- private static void releaseGuardForNodeId(final Semaphore guard) {
+ private static void releaseGuard(final Semaphore guard) {
if (guard != null) {
guard.release();
if (LOG.isTraceEnabled()) {
- LOG.trace("syncup release guard:{} thread:{}", guard, threadName());
+ LOG.trace("Syncup release guard:{} thread:{}", guard);
}
}
}
-
- private static String threadName() {
- final Thread currentThread = Thread.currentThread();
- return currentThread.getName();
- }
}
groupsToAddOrUpdate, metersToAddOrUpdate, flowsToAddOrUpdate,
flowsToRemove, metersToRemove, groupsToRemove);
- counters.setStartNano(System.nanoTime());
final ListenableFuture<RpcResult<Void>> bootstrapResultFuture = RpcResultBuilder.<Void>success().buildFuture();
final ListenableFuture<RpcResult<Void>> resultVehicle = syncPlanPushStrategy.executeSyncStrategy(
bootstrapResultFuture, input, counters);
final CrudCounts meterCrudCounts = counters.getMeterCrudCounts();
final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
LOG.debug("syncup outcome[{}] (added/updated/removed): flow={}/{}/{}, group={}/{}/{}, " +
- "meter={}/{}/{}, took={} ms, errors={}",
+ "meter={}/{}/{}, errors={}",
nodeId.getValue(),
flowCrudCounts.getAdded(), flowCrudCounts.getUpdated(), flowCrudCounts.getRemoved(),
groupCrudCounts.getAdded(), groupCrudCounts.getUpdated(), groupCrudCounts.getRemoved(),
meterCrudCounts.getAdded(), meterCrudCounts.getUpdated(), meterCrudCounts.getRemoved(),
- TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - counters.getStartNano()),
Arrays.toString(input.getErrors().toArray()));
}
return input.isSuccessful();
public class SyncReactorRetryDecorator implements SyncReactor {
private static final Logger LOG = LoggerFactory.getLogger(SyncReactorRetryDecorator.class);
-
private final SyncReactor delegate;
private final ReconciliationRegistry reconciliationRegistry;
final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
if (syncupEntry.isOptimizedConfigDelta() && reconciliationRegistry.isRegistered(nodeId)) {
LOG.debug("Config change ignored because {} is in reconcile.", nodeId.getValue());
- return Futures.immediateFuture(Boolean.FALSE);
+ return Futures.immediateFuture(Boolean.TRUE);
}
ListenableFuture<Boolean> syncupResult = delegate.syncup(flowcapableNodePath,syncupEntry);
public Boolean apply(Boolean result) {
if (result) {
reconciliationRegistry.unregisterIfRegistered(nodeId);
- return true;
} else {
reconciliationRegistry.register(nodeId);
- return false;
}
+ return result;
}
});
}
private final CrudCounts flowCrudCounts;
private final CrudCounts groupCrudCounts;
private final CrudCounts meterCrudCounts;
- private long startNano;
public SyncCrudCounters() {
flowCrudCounts = new CrudCounts();
return meterCrudCounts;
}
-
- public long getStartNano() {
- return startNano;
- }
-
- public void setStartNano(final long startNano) {
- this.startNano = startNano;
- }
-
public void resetAll() {
getGroupCrudCounts().setUpdated(0);
getGroupCrudCounts().setAdded(0);
public void setUp() {
final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setDaemon(false)
- .setNameFormat("frsync-test%d")
+ .setNameFormat(SyncReactorFutureDecorator.FRM_RPC_CLIENT_PREFIX)
.setUncaughtExceptionHandler((thread, e) -> LOG.error("Uncaught exception {}", thread, e))
.build());
syncThreadPool = MoreExecutors.listeningDecorator(executorService);