private final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("smr-executor-%d").build();
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(cpuCores * 2, threadFactory);
- private final Map<Subscriber, Map<Eid, ScheduledFuture<?>>> subscriberFutureMap = Maps.newConcurrentMap();
+ private final Map<Eid, Map<Subscriber, ScheduledFuture<?>>> eidFutureMap = Maps.newConcurrentMap();
void scheduleSmrs(MapRequestBuilder mrb, Iterator<Subscriber> subscribers) {
+ final Eid srcEid = mrb.getSourceEid().getEid();
+ final Map<Subscriber, ScheduledFuture<?>> subscriberFutureMap = Maps.newConcurrentMap();
+
// Using Iterator ensures that we don't get a ConcurrentModificationException when removing a Subscriber
// from a Set.
while (subscribers.hasNext()) {
LOG.debug("Lazy removing expired subscriber entry " + subscriber.getString());
subscribers.remove();
} else {
- final Eid srcEid = mrb.getSourceEid().getEid();
final ScheduledFuture<?> future = executor.scheduleAtFixedRate(new CancellableRunnable(
mrb, subscriber), 0L, ConfigIni.getInstance().getSmrTimeout(), TimeUnit.MILLISECONDS);
-
- if (subscriberFutureMap.containsKey(subscriber)) {
- subscriberFutureMap.get(subscriber).put(srcEid, future);
- } else {
- final Map<Eid, ScheduledFuture<?>> eidFutureMap = Maps.newConcurrentMap();
- eidFutureMap.put(srcEid, future);
- subscriberFutureMap.put(subscriber, eidFutureMap);
- }
+ subscriberFutureMap.put(subscriber, future);
}
}
+
+ if (subscriberFutureMap.isEmpty()) {
+ return;
+ }
+ eidFutureMap.put(srcEid, subscriberFutureMap);
}
void smrReceived(SmrEvent event) {
for (Subscriber subscriber : subscriberList) {
LOG.trace("SMR-invoked event, EID {}, subscriber {}", LispAddressStringifier.getString(event.getEid()),
subscriber.getString());
- final Map<Eid, ScheduledFuture<?>> eidFutureMap = subscriberFutureMap.get(subscriber);
- if (eidFutureMap != null) {
- final ScheduledFuture<?> future = eidFutureMap.get(event.getEid());
+ final Map<Subscriber, ScheduledFuture<?>> subscriberFutureMap = eidFutureMap.get(event.getEid());
+ if (subscriberFutureMap != null) {
+ final ScheduledFuture<?> future = subscriberFutureMap.get(subscriber);
if (future != null && !future.isCancelled()) {
future.cancel(true);
LOG.debug("SMR-invoked MapRequest received, scheduled task for subscriber {}, EID {} with"
+ " nonce {} has been cancelled", subscriber.getString(),
LispAddressStringifier.getString(event.getEid()), event.getNonce());
- eidFutureMap.remove(event.getEid());
- }
- if (eidFutureMap.isEmpty()) {
subscriberFutureMap.remove(subscriber);
}
+ if (subscriberFutureMap.isEmpty()) {
+ eidFutureMap.remove(event.getEid());
+ }
}
}
}
}
private void cancelAndRemove(Subscriber subscriber, Eid eid) {
- final Map<Eid, ScheduledFuture<?>> eidFutureMap = subscriberFutureMap.get(subscriber);
- if (eidFutureMap == null) {
+ final Map<Subscriber, ScheduledFuture<?>> subscriberFutureMap = eidFutureMap.get(eid);
+ if (subscriberFutureMap == null) {
LOG.warn("Couldn't find subscriber {} in SMR scheduler internal list", subscriber);
return;
}
- if (eidFutureMap.containsKey(eid)) {
- ScheduledFuture<?> eidFuture = eidFutureMap.get(eid);
- eidFutureMap.remove(eid);
+ if (subscriberFutureMap.containsKey(subscriber)) {
+ ScheduledFuture<?> eidFuture = subscriberFutureMap.get(subscriber);
+ subscriberFutureMap.remove(subscriber);
eidFuture.cancel(false);
}
- if (eidFutureMap.isEmpty()) {
- subscriberFutureMap.remove(subscriber);
+ if (subscriberFutureMap.isEmpty()) {
+ eidFutureMap.remove(eid);
}
}
}