+ //cache doesnt exist, create one
+ rpcCache = (ConcurrentMap<I,LinkedHashSet<R>>) clusterGlobalServices.createCache(RPC_CACHE,
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
+ log.debug("Cache created [{}] ", RPC_CACHE);
+ }
+
+
+ /**
+ * Function called by the dependency manager when all the required
+ * dependencies are satisfied
+ */
+ void init(Component c) {
+ try {
+
+ findOrCreateGlobalRpcCache();
+ findOrCreateRpcCache();
+
+ } catch (CacheExistException|CacheConfigException|CacheListenerAddException e) {
+ throw new IllegalStateException("could not construct routing table cache");
+ }
+ }
+
+ /**
+ * Useful for unit testing <note>It has package
+ * scope</note>
+ */
+ ConcurrentMap getGlobalRpcCache() {
+ return this.globalRpcCache;
+ }
+
+ /**
+ * Useful for unit testing <note>It has package
+ * scope</note>
+ */
+ ConcurrentMap getRpcCache() {
+ return this.rpcCache;
+ }
+
+ /**
+ * This is used from integration test NP rest API to check out the result of the
+ * cache population
+ * <Note> For testing purpose only-- use it wisely</Note>
+ *
+ * @return
+ */
+ public String dumpGlobalRpcCache() {
+ Set<Map.Entry<I, R>> cacheEntrySet = this.globalRpcCache.entrySet();
+ StringBuilder sb = new StringBuilder();
+ for (Map.Entry<I, R> entry : cacheEntrySet) {
+ sb.append("Key:").append(entry.getKey()).append("---->Value:")
+ .append((entry.getValue() != null) ? entry.getValue() : "null")
+ .append("\n");
+ }
+ return sb.toString();
+ }
+
+ public String dumpRpcCache() {
+ Set<Map.Entry<I, LinkedHashSet<R>>> cacheEntrySet = this.rpcCache.entrySet();
+ StringBuilder sb = new StringBuilder();
+ for (Map.Entry<I, LinkedHashSet<R>> entry : cacheEntrySet) {
+ sb.append("Key:").append(entry.getKey()).append("---->Value:")
+ .append((entry.getValue() != null) ? entry.getValue() : "null")
+ .append("\n");
+ }
+ return sb.toString();
+ }
+ /**
+ * Invoked when a new entry is available in the cache, the key is only
+ * provided, the value will come as an entryUpdate invocation
+ *
+ * @param key Key for the entry just created
+ * @param cacheName name of the cache for which update has been received
+ * @param originLocal true if the event is generated from this node
+ */
+ @Override
+ public void entryCreated(I key, String cacheName, boolean originLocal) {
+ // TBD: do we require this.
+ if (log.isDebugEnabled()) {
+ log.debug("RoutingTableUpdates: entryCreated routeId = " + key + " cacheName=" + cacheName);
+ }
+ }
+
+ /**
+ * Called anytime a given entry is updated
+ *
+ * @param key Key for the entry modified
+ * @param new_value the new value the key will have
+ * @param cacheName name of the cache for which update has been received
+ * @param originLocal true if the event is generated from this node
+ */
+ @Override
+ public void entryUpdated(I key, R new_value, String cacheName, boolean originLocal) {
+ if (log.isDebugEnabled()) {
+ log.debug("RoutingTableUpdates: entryUpdated routeId = " + key + ",value = " + new_value
+ + " ,cacheName=" + cacheName + " originLocal=" + originLocal);
+ }
+// if (!originLocal) {
+// for (RouteChangeListener rcl : routeChangeListeners) {
+// rcl.onRouteUpdated(key, new_value);
+// }
+// }
+ }
+
+ /**
+ * Called anytime a given key is removed from the ConcurrentHashMap we are
+ * listening to.
+ *
+ * @param key Key of the entry removed
+ * @param cacheName name of the cache for which update has been received
+ * @param originLocal true if the event is generated from this node
+ */
+ @Override
+ public void entryDeleted(I key, String cacheName, boolean originLocal) {
+ if (log.isDebugEnabled()) {
+ log.debug("RoutingTableUpdates: entryUpdated routeId = " + key + " local = " + originLocal
+ + " cacheName=" + cacheName + " originLocal=" + originLocal);