Bug 8764: Fix handling of old negative
[lispflowmapping.git] / mappingservice / implementation / src / main / java / org / opendaylight / lispflowmapping / implementation / lisp / MapServer.java
1 /*
2  * Copyright (c) 2014, 2017 Contextream, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.lispflowmapping.implementation.lisp;
10
11 import com.google.common.base.Preconditions;
12 import com.google.common.collect.Maps;
13 import com.google.common.collect.Sets;
14 import com.google.common.util.concurrent.ThreadFactoryBuilder;
15 import java.net.InetAddress;
16 import java.net.NetworkInterface;
17 import java.net.SocketException;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.Enumeration;
21 import java.util.HashSet;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Objects;
26 import java.util.Set;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.ScheduledExecutorService;
29 import java.util.concurrent.ScheduledFuture;
30 import java.util.concurrent.ThreadFactory;
31 import java.util.concurrent.TimeUnit;
32 import org.apache.commons.lang3.BooleanUtils;
33 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
34 import org.opendaylight.lispflowmapping.config.ConfigIni;
35 import org.opendaylight.lispflowmapping.implementation.util.LoggingUtil;
36 import org.opendaylight.lispflowmapping.implementation.util.MSNotificationInputUtil;
37 import org.opendaylight.lispflowmapping.interfaces.dao.SubKeys;
38 import org.opendaylight.lispflowmapping.interfaces.dao.Subscriber;
39 import org.opendaylight.lispflowmapping.interfaces.lisp.IMapNotifyHandler;
40 import org.opendaylight.lispflowmapping.interfaces.lisp.IMapServerAsync;
41 import org.opendaylight.lispflowmapping.interfaces.lisp.ISmrNotificationListener;
42 import org.opendaylight.lispflowmapping.interfaces.lisp.SmrEvent;
43 import org.opendaylight.lispflowmapping.interfaces.mappingservice.IMappingService;
44 import org.opendaylight.lispflowmapping.lisp.authentication.LispAuthenticationUtil;
45 import org.opendaylight.lispflowmapping.lisp.type.LispMessage;
46 import org.opendaylight.lispflowmapping.lisp.type.MappingData;
47 import org.opendaylight.lispflowmapping.lisp.util.LispAddressStringifier;
48 import org.opendaylight.lispflowmapping.lisp.util.LispAddressUtil;
49 import org.opendaylight.lispflowmapping.lisp.util.MapNotifyBuilderHelper;
50 import org.opendaylight.lispflowmapping.lisp.util.MapRequestUtil;
51 import org.opendaylight.lispflowmapping.lisp.util.MappingRecordUtil;
52 import org.opendaylight.lispflowmapping.lisp.util.SourceDestKeyHelper;
53 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.PortNumber;
54 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.lisp.address.types.rev151105.lisp.address.address.SourceDestKey;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.inet.binary.types.rev160303.IpAddressBinary;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.MapRegister;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.SiteId;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.eid.container.Eid;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.eid.list.EidItem;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.eid.list.EidItemBuilder;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.mapnotifymessage.MapNotifyBuilder;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.mapping.authkey.container.MappingAuthkey;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.mapping.record.container.MappingRecord;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.mapping.record.list.MappingRecordItem;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.mapping.record.list.MappingRecordItemBuilder;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.maprequestnotification.MapRequestBuilder;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.transport.address.TransportAddress;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.transport.address.TransportAddressBuilder;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.mappingservice.rev150906.MappingChanged;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.mappingservice.rev150906.MappingOrigin;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.mappingservice.rev150906.OdlMappingserviceListener;
72 import org.opendaylight.yangtools.concepts.ListenerRegistration;
73 import org.slf4j.Logger;
74 import org.slf4j.LoggerFactory;
75
76 public class MapServer implements IMapServerAsync, OdlMappingserviceListener, ISmrNotificationListener {
77
78     protected static final Logger LOG = LoggerFactory.getLogger(MapServer.class);
79     private static final byte[] ALL_ZEROES_XTR_ID = new byte[] {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ,0};
80     private IMappingService mapService;
81     private boolean subscriptionService;
82     private IMapNotifyHandler notifyHandler;
83     private NotificationService notificationService;
84     private ListenerRegistration<MapServer> mapServerListenerRegistration;
85     private SmrScheduler scheduler;
86
87     public MapServer(IMappingService mapService, boolean subscriptionService,
88                      IMapNotifyHandler notifyHandler, NotificationService notificationService) {
89         Preconditions.checkNotNull(mapService);
90         this.mapService = mapService;
91         this.subscriptionService = subscriptionService;
92         this.notifyHandler = notifyHandler;
93         this.notificationService = notificationService;
94         if (notificationService != null) {
95             notificationService.registerNotificationListener(this);
96         }
97         scheduler = new SmrScheduler();
98     }
99
100     @Override
101     public void setSubscriptionService(boolean subscriptionService) {
102         this.subscriptionService = subscriptionService;
103     }
104
105     @SuppressWarnings("unchecked")
106     public void handleMapRegister(MapRegister mapRegister) {
107         boolean mappingUpdated = false;
108         boolean merge = ConfigIni.getInstance().mappingMergeIsSet() && mapRegister.isMergeEnabled();
109         Set<Subscriber> subscribers = Sets.newConcurrentHashSet();
110         MappingRecord oldMapping;
111
112         if (merge) {
113             if (!mapRegister.isXtrSiteIdPresent() || mapRegister.getXtrId() == null) {
114                 LOG.error("Merge bit is set in Map-Register, but xTR-ID is not present. Will not merge.");
115                 merge = false;
116             } else if (Arrays.equals(mapRegister.getXtrId().getValue(), ALL_ZEROES_XTR_ID)) {
117                 LOG.warn("Merge bit is set in Map-Register, but xTR-ID is all zeroes.");
118             }
119         }
120
121         for (MappingRecordItem record : mapRegister.getMappingRecordItem()) {
122             MappingRecord mapping = record.getMappingRecord();
123             Eid eid = mapping.getEid();
124             MappingData mappingData = new MappingData(mapping, System.currentTimeMillis());
125             mappingData.setMergeEnabled(merge);
126             mappingData.setXtrId(mapRegister.getXtrId());
127
128             oldMapping = getMappingRecord(mapService.getMapping(MappingOrigin.Southbound, eid));
129             mapService.addMapping(MappingOrigin.Southbound, eid, getSiteId(mapRegister), mappingData);
130             if (oldMapping != null
131                     && MappingRecordUtil.isNegativeMapping(oldMapping)
132                     && !oldMapping.getEid().equals(eid)) {
133                 if (subscriptionService) {
134                     // Here we save the subscribers of the OLD mapping before removing. We will add to this set the
135                     // subscribers of the NEW mapping below (since the EIDs are different, the result of
136                     // mappingChanged() will be true, and then send an SMR to all subscribers with the EID of the NEW
137                     // mapping only.
138                     Set<Subscriber> oldMappingSubscribers = getSubscribers(oldMapping.getEid());
139                     if (oldMappingSubscribers != null) {
140                         subscribers.addAll(oldMappingSubscribers);
141                         LoggingUtil.logSubscribers(LOG, oldMapping.getEid(), subscribers);
142                     }
143                 }
144                 mapService.removeMapping(MappingOrigin.Southbound, oldMapping.getEid());
145             }
146
147             if (subscriptionService) {
148                 MappingRecord newMapping = merge
149                         ? getMappingRecord(mapService.getMapping(MappingOrigin.Southbound, eid)) : mapping;
150
151                 if (mappingChanged(oldMapping, newMapping)) {
152                     if (LOG.isDebugEnabled()) {
153                         LOG.debug("Mapping update occured for {} SMRs will be sent for its subscribers.",
154                                 LispAddressStringifier.getString(eid));
155                     }
156                     Set<Subscriber> newMappingSubscribers = getSubscribers(eid);
157                     if (oldMapping != null && !oldMapping.getEid().equals(eid)) {
158                         newMappingSubscribers = addParentSubscribers(eid, newMappingSubscribers);
159                     }
160                     if (newMappingSubscribers != null) {
161                         subscribers.addAll(newMappingSubscribers);
162                         LoggingUtil.logSubscribers(LOG, eid, subscribers);
163                     }
164                     handleSmr(eid, subscribers);
165                     mappingUpdated = true;
166                 }
167             }
168         }
169         if (BooleanUtils.isTrue(mapRegister.isWantMapNotify())) {
170             LOG.trace("MapRegister wants MapNotify");
171             MapNotifyBuilder builder = new MapNotifyBuilder();
172             List<TransportAddress> rlocs = null;
173             if (merge) {
174                 Set<IpAddressBinary> notifyRlocs = new HashSet<IpAddressBinary>();
175                 List<MappingRecordItem> mergedMappings = new ArrayList<MappingRecordItem>();
176                 for (MappingRecordItem record : mapRegister.getMappingRecordItem()) {
177                     MappingRecord mapping = record.getMappingRecord();
178                     MappingRecord currentRecord = getMappingRecord(mapService.getMapping(MappingOrigin.Southbound,
179                             mapping.getEid()));
180                     mergedMappings.add(new MappingRecordItemBuilder().setMappingRecord(currentRecord).build());
181                     Set<IpAddressBinary> sourceRlocs = (Set<IpAddressBinary>) mapService.getData(
182                             MappingOrigin.Southbound, mapping.getEid(), SubKeys.SRC_RLOCS);
183                     if (sourceRlocs != null) {
184                         notifyRlocs.addAll(sourceRlocs);
185                     }
186                 }
187                 MapNotifyBuilderHelper.setFromMapRegisterAndMappingRecordItems(builder, mapRegister, mergedMappings);
188                 // send map-notify to merge group only when mapping record is changed
189                 if (mappingUpdated) {
190                     rlocs = getTransportAddresses(notifyRlocs);
191                 }
192             } else {
193                 MapNotifyBuilderHelper.setFromMapRegister(builder, mapRegister);
194             }
195             List<MappingRecordItem> mappings = builder.getMappingRecordItem();
196             if (mappings != null && mappings.get(0) != null && mappings.get(0).getMappingRecord() != null
197                     && mappings.get(0).getMappingRecord().getEid() != null) {
198                 MappingAuthkey authkey = mapService.getAuthenticationKey(mappings.get(0).getMappingRecord().getEid());
199                 if (authkey != null) {
200                     builder.setAuthenticationData(LispAuthenticationUtil.createAuthenticationData(builder.build(),
201                             authkey.getKeyString()));
202                 }
203             }
204             notifyHandler.handleMapNotify(builder.build(), rlocs);
205         }
206     }
207
208     private static List<TransportAddress> getTransportAddresses(Set<IpAddressBinary> addresses) {
209         List<TransportAddress> rlocs = new ArrayList<TransportAddress>();
210         for (IpAddressBinary address : addresses) {
211             TransportAddressBuilder tab = new TransportAddressBuilder();
212             tab.setIpAddress(address);
213             tab.setPort(new PortNumber(LispMessage.PORT_NUM));
214             rlocs.add(tab.build());
215         }
216         return rlocs;
217     }
218
219     private static SiteId getSiteId(MapRegister mapRegister) {
220         return (mapRegister.getSiteId() != null) ? new SiteId(mapRegister.getSiteId()) : null;
221     }
222
223     private static MappingRecord getMappingRecord(MappingData mappingData) {
224         return (mappingData != null) ? mappingData.getRecord() : null;
225     }
226
227     @Override
228     public void onMappingChanged(MappingChanged notification) {
229         LOG.trace("MappingChanged event of type: `{}'", notification.getChangeType());
230         if (subscriptionService) {
231             Eid eid = notification.getMappingRecord().getEid();
232             Set<Subscriber> subscribers = MSNotificationInputUtil.toSubscriberSet(notification.getSubscriberItem());
233             LoggingUtil.logSubscribers(LOG, eid, subscribers);
234             if (mapService.isMaster()) {
235                 sendSmrs(eid, subscribers);
236                 if (eid.getAddress() instanceof SourceDestKey) {
237                     Set<Subscriber> dstSubscribers = MSNotificationInputUtil.toSubscriberSetFromDst(
238                             notification.getDstSubscriberItem());
239                     LoggingUtil.logSubscribers(LOG, SourceDestKeyHelper.getDstBinary(eid), dstSubscribers);
240                     sendSmrs(SourceDestKeyHelper.getDstBinary(eid), dstSubscribers);
241                 }
242             }
243         }
244     }
245
246     private static boolean mappingChanged(MappingRecord oldMapping, MappingRecord newMapping) {
247         // We only check for fields we care about
248         // XXX: This code needs to be checked and updated when the YANG model for MappingRecord is modified
249         Preconditions.checkNotNull(newMapping, "The new mapping should never be null");
250         if (oldMapping == null) {
251             LOG.trace("mappingChanged(): old mapping is null");
252             return true;
253         } else if (!Objects.equals(oldMapping.getEid(), newMapping.getEid())) {
254             LOG.trace("mappingChanged(): EID");
255             return true;
256         } else if (!Objects.equals(oldMapping.getLocatorRecord(), newMapping.getLocatorRecord())) {
257             LOG.trace("mappingChanged(): RLOC");
258             return true;
259         } else if (!Objects.equals(oldMapping.getAction(), newMapping.getAction())) {
260             LOG.trace("mappingChanged(): action");
261             return true;
262         } else if (!Objects.equals(oldMapping.getRecordTtl(), newMapping.getRecordTtl())) {
263             LOG.trace("mappingChanged(): TTL");
264             return true;
265         } else if (!Objects.equals(oldMapping.getMapVersion(), newMapping.getMapVersion())) {
266             LOG.trace("mappingChanged(): mapping version");
267             return true;
268         }
269         return false;
270     }
271
272     private void handleSmr(Eid eid, Set<Subscriber> subscribers) {
273         sendSmrs(eid, subscribers);
274
275         // For SrcDst LCAF also send SMRs to Dst prefix
276         if (eid.getAddress() instanceof SourceDestKey) {
277             Eid dstAddr = SourceDestKeyHelper.getDstBinary(eid);
278             Set<Subscriber> dstSubs = getSubscribers(dstAddr);
279             sendSmrs(dstAddr, dstSubs);
280         }
281     }
282
283     private void sendSmrs(Eid eid, Set<Subscriber> subscribers) {
284         if (subscribers == null) {
285             return;
286         }
287         final MapRequestBuilder mrb = MapRequestUtil.prepareSMR(eid, LispAddressUtil.toRloc(getLocalAddress()));
288         LOG.trace("Built SMR packet: " + mrb.build().toString());
289
290         scheduler.scheduleSmrs(mrb, subscribers.iterator());
291     }
292
293     @SuppressWarnings("unchecked")
294     private Set<Subscriber> getSubscribers(Eid address) {
295         return (Set<Subscriber>) mapService.getData(MappingOrigin.Southbound, address, SubKeys.SUBSCRIBERS);
296     }
297
298     private Set<Subscriber> addParentSubscribers(Eid eid, Set<Subscriber> subscribers) {
299         Eid parentPrefix = mapService.getParentPrefix(eid);
300         if (parentPrefix == null) {
301             return subscribers;
302         }
303
304         Set<Subscriber> parentSubscribers = getSubscribers(parentPrefix);
305         if (parentSubscribers != null) {
306             if (subscribers != null) {
307                 subscribers.addAll(parentSubscribers);
308             } else {
309                 subscribers = parentSubscribers;
310             }
311         }
312         return subscribers;
313     }
314
315     private void addSubscribers(Eid address, Set<Subscriber> subscribers) {
316         mapService.addData(MappingOrigin.Southbound, address, SubKeys.SUBSCRIBERS, subscribers);
317     }
318
319     private static InetAddress getLocalAddress() {
320         try {
321             Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
322             while (interfaces.hasMoreElements()) {
323                 NetworkInterface current = interfaces.nextElement();
324                 LOG.trace("Interface " + current.toString());
325                 if (!current.isUp() || current.isLoopback() || current.isVirtual()) {
326                     continue;
327                 }
328                 Enumeration<InetAddress> addresses = current.getInetAddresses();
329                 while (addresses.hasMoreElements()) {
330                     InetAddress currentAddr = addresses.nextElement();
331                     // Skip loopback and link local addresses
332                     if (currentAddr.isLoopbackAddress() || currentAddr.isLinkLocalAddress()) {
333                         continue;
334                     }
335                     LOG.debug(currentAddr.getHostAddress());
336                     return currentAddr;
337                 }
338             }
339         } catch (SocketException se) {
340             LOG.debug("Caught socket exception", se);
341         }
342         return null;
343     }
344
345     @Override
346     public void onSmrInvokedReceived(SmrEvent event) {
347         scheduler.smrReceived(event);
348     }
349
350     /**
351      * Task scheduler is responsible for resending SMR messages to a subscriber (xTR)
352      * {@value ConfigIni#LISP_SMR_RETRY_COUNT} times, or until {@link ISmrNotificationListener#onSmrInvokedReceived}
353      * is triggered.
354      */
355     private class SmrScheduler {
356         final int cpuCores = Runtime.getRuntime().availableProcessors();
357         private final ThreadFactory threadFactory = new ThreadFactoryBuilder()
358                 .setNameFormat("smr-executor-%d").build();
359         private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(cpuCores * 2, threadFactory);
360         private final Map<Eid, Map<Subscriber, ScheduledFuture<?>>> eidFutureMap = Maps.newConcurrentMap();
361
362         void scheduleSmrs(MapRequestBuilder mrb, Iterator<Subscriber> subscribers) {
363             final Eid srcEid = mrb.getSourceEid().getEid();
364             cancelExistingFuturesForEid(srcEid);
365
366             final Map<Subscriber, ScheduledFuture<?>> subscriberFutureMap = Maps.newConcurrentMap();
367
368             // Using Iterator ensures that we don't get a ConcurrentModificationException when removing a Subscriber
369             // from a Set.
370             while (subscribers.hasNext()) {
371                 Subscriber subscriber = subscribers.next();
372                 if (subscriber.timedOut()) {
373                     LOG.debug("Lazy removing expired subscriber entry " + subscriber.getString());
374                     subscribers.remove();
375                 } else {
376                     final ScheduledFuture<?> future = executor.scheduleAtFixedRate(new CancellableRunnable(
377                             mrb, subscriber), 0L, ConfigIni.getInstance().getSmrTimeout(), TimeUnit.MILLISECONDS);
378                     subscriberFutureMap.put(subscriber, future);
379                 }
380             }
381
382             if (subscriberFutureMap.isEmpty()) {
383                 return;
384             }
385             eidFutureMap.put(srcEid, subscriberFutureMap);
386         }
387
388         void smrReceived(SmrEvent event) {
389             final List<Subscriber> subscriberList = event.getSubscriberList();
390             for (Subscriber subscriber : subscriberList) {
391                 LOG.trace("SMR-invoked event, EID {}, subscriber {}", LispAddressStringifier.getString(event.getEid()),
392                         subscriber.getString());
393                 final Map<Subscriber, ScheduledFuture<?>> subscriberFutureMap = eidFutureMap.get(event.getEid());
394                 if (subscriberFutureMap != null) {
395                     final ScheduledFuture<?> future = subscriberFutureMap.get(subscriber);
396                     if (future != null && !future.isCancelled()) {
397                         future.cancel(true);
398                         LOG.debug("SMR-invoked MapRequest received, scheduled task for subscriber {}, EID {} with"
399                                 + " nonce {} has been cancelled", subscriber.getString(),
400                                 LispAddressStringifier.getString(event.getEid()), event.getNonce());
401                         subscriberFutureMap.remove(subscriber);
402                     }
403                     if (subscriberFutureMap.isEmpty()) {
404                         eidFutureMap.remove(event.getEid());
405                     }
406                 }
407             }
408         }
409
410         private void cancelExistingFuturesForEid(Eid eid) {
411             synchronized (eidFutureMap) {
412                 if (eidFutureMap.containsKey(eid)) {
413                     final Map<Subscriber, ScheduledFuture<?>> subscriberFutureMap = eidFutureMap.get(eid);
414                     Iterator<Subscriber> oldSubscribers = subscriberFutureMap.keySet().iterator();
415                     while (oldSubscribers.hasNext()) {
416                         Subscriber subscriber = oldSubscribers.next();
417                         ScheduledFuture<?> subscriberFuture = subscriberFutureMap.get(subscriber);
418                         subscriberFuture.cancel(true);
419                     }
420                     eidFutureMap.remove(eid);
421                 }
422             }
423         }
424
425         private final class CancellableRunnable implements Runnable {
426             private MapRequestBuilder mrb;
427             private Subscriber subscriber;
428             private int executionCount = 1;
429
430             CancellableRunnable(MapRequestBuilder mrb, Subscriber subscriber) {
431                 this.mrb = mrb;
432                 this.subscriber = subscriber;
433             }
434
435             @SuppressWarnings("checkstyle:IllegalCatch")
436             @Override
437             public void run() {
438                 final Eid srcEid = mrb.getSourceEid().getEid();
439
440                 try {
441                     // The address stored in the SMR's EID record is used as Source EID in the SMR-invoked
442                     // Map-Request. To ensure consistent behavior it is set to the value used to originally request
443                     // a given mapping.
444                     if (executionCount <= ConfigIni.getInstance().getSmrRetryCount()) {
445                         synchronized (mrb) {
446                             mrb.setEidItem(new ArrayList<EidItem>());
447                             mrb.getEidItem().add(new EidItemBuilder().setEid(subscriber.getSrcEid()).build());
448                             notifyHandler.handleSMR(mrb.build(), subscriber.getSrcRloc());
449                             if (LOG.isTraceEnabled()) {
450                                 LOG.trace("Attempt #{} to send SMR to subscriber {} for EID {}",
451                                         executionCount,
452                                         subscriber.getString(),
453                                         LispAddressStringifier.getString(mrb.getSourceEid().getEid()));
454                             }
455                         }
456                     } else {
457                         LOG.trace("Cancelling execution of a SMR Map-Request after {} failed attempts.",
458                                 executionCount - 1);
459                         cancelAndRemove(subscriber, srcEid);
460                         return;
461                     }
462                 } catch (Exception e) {
463                     LOG.error("Errors encountered while handling SMR:", e);
464                     cancelAndRemove(subscriber, srcEid);
465                     return;
466                 }
467                 executionCount++;
468             }
469
470             private void cancelAndRemove(Subscriber subscriber, Eid eid) {
471                 final Map<Subscriber, ScheduledFuture<?>> subscriberFutureMap = eidFutureMap.get(eid);
472                 if (subscriberFutureMap == null) {
473                     LOG.warn("Couldn't find subscriber {} in SMR scheduler internal list", subscriber);
474                     return;
475                 }
476
477                 if (subscriberFutureMap.containsKey(subscriber)) {
478                     ScheduledFuture<?> eidFuture = subscriberFutureMap.get(subscriber);
479                     subscriberFutureMap.remove(subscriber);
480                     eidFuture.cancel(false);
481                 }
482                 if (subscriberFutureMap.isEmpty()) {
483                     eidFutureMap.remove(eid);
484                 }
485             }
486         }
487     }
488 }