2 * Copyright (c) 2014 Contextream, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.lispflowmapping.clusterdao;
11 import java.util.EnumSet;
13 import java.util.concurrent.ConcurrentHashMap;
14 import java.util.concurrent.ConcurrentMap;
15 import java.util.concurrent.Executors;
16 import java.util.concurrent.ScheduledExecutorService;
17 import java.util.concurrent.TimeUnit;
19 import org.opendaylight.controller.clustering.services.CacheConfigException;
20 import org.opendaylight.controller.clustering.services.CacheExistException;
21 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
22 import org.opendaylight.controller.clustering.services.IClusterServices;
23 import org.opendaylight.lispflowmapping.interfaces.dao.ILispDAO;
24 import org.opendaylight.lispflowmapping.interfaces.dao.IRowVisitor;
25 import org.opendaylight.lispflowmapping.interfaces.dao.MappingEntry;
26 import org.opendaylight.lispflowmapping.interfaces.dao.RLOCGroup;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
30 public class ClusterDAOService implements ILispDAO {
32 protected static final Logger LOG = LoggerFactory.getLogger(ClusterDAOService.class);
33 private IClusterContainerServices clusterContainerService = null;
34 private ConcurrentMap<Object, ConcurrentMap<String, Object>> data;
35 private final String CACHE_NAME = "mappingServiceCache";
36 private TimeUnit timeUnit = TimeUnit.SECONDS;
37 private int recordTimeOut = 240;
38 private int cleanInterval = 10;
39 private ScheduledExecutorService scheduler;
41 void setClusterContainerService(IClusterContainerServices s) {
42 this.clusterContainerService = s;
45 scheduler = Executors.newScheduledThreadPool(1);
46 scheduler.scheduleAtFixedRate(new Runnable() {
51 }, 0, cleanInterval, timeUnit);
54 void unsetClusterContainerService(IClusterContainerServices s) {
55 LOG.trace("Cluster Service unset");
56 if (this.clusterContainerService == s) {
57 this.clusterContainerService = null;
59 scheduler.shutdownNow();
62 private void allocateCache() {
63 if (this.clusterContainerService == null) {
64 LOG.warn("un-initialized clusterContainerService, can't create cache");
67 LOG.trace("Creating Cache for ClusterDAOService");
69 this.clusterContainerService.createCache(CACHE_NAME, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
70 } catch (CacheConfigException cce) {
71 LOG.warn("Cache couldn't be created for ClusterDAOService - check cache mode");
72 } catch (CacheExistException cce) {
73 LOG.warn("Cache for ClusterDAOService already exists, destroy and recreate");
75 LOG.trace("Cache successfully created for ClusterDAOService");
78 @SuppressWarnings({ "unchecked" })
79 private void retrieveCache() {
80 if (this.clusterContainerService == null) {
81 LOG.warn("un-initialized clusterContainerService, can't retrieve cache");
84 LOG.trace("Retrieving cache for ClusterDAOService");
85 data = (ConcurrentMap<Object, ConcurrentMap<String, Object>>) this.clusterContainerService.getCache(CACHE_NAME);
87 LOG.warn("Cache couldn't be retrieved for ClusterDAOService");
89 LOG.trace("Cache was successfully retrieved for ClusterDAOService");
92 public void getAll(IRowVisitor visitor) {
93 for (ConcurrentMap.Entry<Object, ConcurrentMap<String, Object>> keyEntry : data.entrySet()) {
94 for (Map.Entry<String, Object> valueEntry : keyEntry.getValue().entrySet()) {
95 visitor.visitRow(keyEntry.getKey(), valueEntry.getKey(), valueEntry.getValue());
100 public void put(Object key, MappingEntry<?>... values) {
101 if (!data.containsKey(key)) {
102 data.put(key, new ConcurrentHashMap<String, Object>());
104 for (MappingEntry<?> entry : values) {
105 data.get(key).put(entry.getKey(), entry.getValue());
109 public void cleanOld() {
110 getAll(new IRowVisitor() {
111 public void visitRow(Object keyId, String valueKey, Object value) {
112 if (value instanceof RLOCGroup) {
113 RLOCGroup rloc = (RLOCGroup) value;
114 if (isExpired(rloc)) {
115 removeSpecific(keyId, valueKey);
120 private boolean isExpired(RLOCGroup rloc) {
121 return System.currentTimeMillis() - rloc.getRegisterdDate().getTime() > TimeUnit.MILLISECONDS.convert(recordTimeOut, timeUnit);
126 public Object getSpecific(Object key, String valueKey) {
127 Map<String, Object> keyToValues = data.get(key);
128 if (keyToValues == null) {
131 return keyToValues.get(valueKey);
134 public Map<String, Object> get(Object key) {
135 return data.get(key);
138 public void remove(Object key) {
142 public void removeSpecific(Object key, String valueKey) {
143 if (data.containsKey(key) && data.get(key).containsKey(valueKey)) {
144 data.get(key).remove(valueKey);
148 public void removeAll() {
152 public TimeUnit getTimeUnit() {
156 public void setRecordTimeOut(int recordTimeOut) {
157 this.recordTimeOut = recordTimeOut;
160 public int getRecordTimeOut() {
161 return recordTimeOut;
164 public void setTimeUnit(TimeUnit timeUnit) {
165 this.timeUnit = timeUnit;