package org.opendaylight.controller.clustering.services_implementation.internal;
-import java.io.PrintWriter;
-import java.io.StringWriter;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import org.infinispan.Cache;
import org.infinispan.configuration.cache.Configuration;
+import org.infinispan.configuration.cache.ConfigurationBuilder;
+import org.infinispan.configuration.global.GlobalConfigurationBuilder;
+import org.infinispan.configuration.parsing.ConfigurationBuilderHolder;
+import org.infinispan.configuration.parsing.ParserRegistry;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.notifications.Listener;
.getLogger(ClusterManager.class);
private DefaultCacheManager cm;
GossipRouter gossiper;
- private HashSet roleChangeListeners;
+ private HashSet<IListenRoleChange> roleChangeListeners;
private ViewChangedListener cacheManagerListener;
- private static String loopbackAddress = "127.0.0.1";
+ private static String loopbackAddress = InetAddress.getLoopbackAddress().getHostAddress();
+ private static final int gossipRouterPortDefault = 12001;
+ // defaultTransactionTimeout is 60 seconds
+ private static int DEFAULT_TRANSACTION_TIMEOUT = 60;
/**
* Start a JGroups GossipRouter if we are a supernode. The
*/
private GossipRouter startGossiper() {
boolean amIGossipRouter = false;
- Integer gossipRouterPortDefault = 12001;
Integer gossipRouterPort = gossipRouterPortDefault;
InetAddress gossipRouterAddress = null;
String supernodes_list = System.getProperty("supernodes",
loopbackAddress);
- StringBuffer sanitized_supernodes_list = new StringBuffer();
+ StringBuilder sanitized_supernodes_list = new StringBuilder();
List<InetAddress> myAddresses = new ArrayList<InetAddress>();
StringTokenizer supernodes = new StringTokenizer(supernodes_list, ":");
if (supernodes.hasMoreTokens()) {
// Populate the list of my addresses
try {
- Enumeration e = NetworkInterface.getNetworkInterfaces();
+ Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
while (e.hasMoreElements()) {
- NetworkInterface n = (NetworkInterface) e.nextElement();
- Enumeration ee = n.getInetAddresses();
+ NetworkInterface n = e.nextElement();
+ Enumeration<InetAddress> ee = n.getInetAddresses();
while (ee.hasMoreElements()) {
- InetAddress i = (InetAddress) ee.nextElement();
+ InetAddress i = ee.nextElement();
myAddresses.add(i);
}
}
}
while (supernodes.hasMoreTokens()) {
String curr_supernode = supernodes.nextToken();
- logger.debug("Examining supernode " + curr_supernode);
+ logger.debug("Examining supernode {}", curr_supernode);
StringTokenizer host_port = new StringTokenizer(curr_supernode,
"[]");
String host;
String port;
Integer port_num = gossipRouterPortDefault;
if (host_port.countTokens() > 2) {
- logger.error("Error parsing supernode " + curr_supernode
- + " proceed to the next one");
+ logger.error("Error parsing supernode {} proceed to the next one",
+ curr_supernode);
continue;
}
host = host_port.nextToken();
try {
hostAddr = InetAddress.getByName(host);
} catch (UnknownHostException ue) {
- logger.error("Host not known");
+ logger.error("Host {} is not known", host);
continue;
}
if (host_port.hasMoreTokens()) {
try {
port_num = Integer.valueOf(port);
} catch (NumberFormatException ne) {
- logger
- .error("Supplied supernode gossiepr port is not recognized, using standard gossipport");
+ logger.error("Supplied supernode gossip port is not recognized, using default gossip port {}",
+ gossipRouterPortDefault);
port_num = gossipRouterPortDefault;
}
if ((port_num > 65535) || (port_num < 0)) {
- logger
- .error("Supplied supernode gossip port is outside a valid TCP port range");
+ logger.error("Supplied supernode gossip port is outside a valid TCP port range");
port_num = gossipRouterPortDefault;
}
}
if (!sanitized_supernodes_list.toString().equals("")) {
sanitized_supernodes_list.append(",");
}
- sanitized_supernodes_list.append(hostAddr.getHostAddress() + "["
- + port_num + "]");
+ sanitized_supernodes_list.append(hostAddr.getHostAddress()).append("[").append(port_num).append("]");
}
if (amIGossipRouter) {
for (InetAddress myAddr : myAddresses) {
if (myAddr.isLoopbackAddress()
|| myAddr.isLinkLocalAddress()) {
- logger.debug("Skipping local address "
- + myAddr.getHostAddress());
+ logger.debug("Skipping local address {}",
+ myAddr.getHostAddress());
continue;
} else {
// First non-local address
myBind = myAddr.getHostAddress();
- logger.debug("First non-local address " + myBind);
+ logger.debug("First non-local address {}", myBind);
break;
}
}
.getProperty("jgroups.tcp.address");
if (jgroupAddress == null) {
if (myBind != null) {
- logger.debug("Set bind address to be " + myBind);
+ logger.debug("Set bind address to be {}", myBind);
System.setProperty("jgroups.tcp.address", myBind);
} else {
logger
System.setProperty("jgroups.tcp.address", "127.0.0.1");
}
} else {
- logger.debug("jgroup.tcp.address already set to be "
- + jgroupAddress);
+ logger.debug("jgroup.tcp.address already set to be {}",
+ jgroupAddress);
}
} catch (UnknownHostException uhe) {
logger
// host list
System.setProperty("jgroups.tcpgossip.initial_hosts",
sanitized_supernodes_list.toString());
- logger.debug("jgroups.tcp.address set to "
- + System.getProperty("jgroups.tcp.address"));
- logger.debug("jgroups.tcpgossip.initial_hosts set to "
- + System.getProperty("jgroups.tcpgossip.initial_hosts"));
+ logger.debug("jgroups.tcp.address set to {}",
+ System.getProperty("jgroups.tcp.address"));
+ logger.debug("jgroups.tcpgossip.initial_hosts set to {}",
+ System.getProperty("jgroups.tcpgossip.initial_hosts"));
GossipRouter res = null;
if (amIGossipRouter) {
- logger.info("I'm a GossipRouter will listen on port "
- + gossipRouterPort);
- res = new GossipRouter(gossipRouterPort);
+ logger.info("I'm a GossipRouter will listen on port {}",
+ gossipRouterPort);
+ // Start a GossipRouter with JMX support
+ res = new GossipRouter(gossipRouterPort, null, true);
}
return res;
}
+ private void exitOnSecurityException(Exception ioe) {
+ Throwable cause = ioe.getCause();
+ while (cause != null) {
+ if (cause instanceof java.lang.SecurityException) {
+ logger.error("Failed Cluster authentication. Stopping Controller...");
+ System.exit(0);
+ }
+ cause = cause.getCause();
+ }
+ }
+
public void start() {
this.gossiper = startGossiper();
if (this.gossiper != null) {
this.gossiper.start();
logger.info("Started GossipRouter");
} catch (Exception e) {
- logger.error("GossipRouter didn't start exception " + e
- + " met");
- StringWriter sw = new StringWriter();
- logger.error("Stack Trace that raised the exception");
- e.printStackTrace(new PrintWriter(sw));
- logger.error(sw.toString());
+ logger.error("GossipRouter didn't start. Exception Stack Trace",
+ e);
}
}
logger.info("Starting the ClusterManager");
try {
- //FIXME keeps throwing FileNotFoundException
- this.cm = new DefaultCacheManager("/config/infinispan-config.xml");
+ ParserRegistry parser = new ParserRegistry(this.getClass()
+ .getClassLoader());
+ String infinispanConfigFile =
+ System.getProperty("org.infinispan.config.file", "config/infinispan-config.xml");
+ logger.debug("Using configuration file:{}", infinispanConfigFile);
+ ConfigurationBuilderHolder holder = parser.parseFile(infinispanConfigFile);
+ GlobalConfigurationBuilder globalBuilder = holder.getGlobalConfigurationBuilder();
+ globalBuilder.serialization()
+ .classResolver(new ClassResolver())
+ .build();
+ this.cm = new DefaultCacheManager(holder, false);
logger.debug("Allocated ClusterManager");
if (this.cm != null) {
this.cm.start();
logger.debug("Started the ClusterManager");
}
} catch (Exception ioe) {
- StringWriter sw = new StringWriter();
logger.error("Cannot configure infinispan .. bailing out ");
logger.error("Stack Trace that raised th exception");
- ioe.printStackTrace(new PrintWriter(sw));
- logger.error(sw.toString());
+ logger.error("",ioe);
this.cm = null;
+ exitOnSecurityException(ioe);
this.stop();
}
- logger.debug("Cache Manager has value " + this.cm);
+ logger.debug("Cache Manager has value {}", this.cm);
}
public void stop() {
String cacheName, Set<cacheMode> cMode) throws CacheExistException,
CacheConfigException {
EmbeddedCacheManager manager = this.cm;
- Cache c;
+ Cache<Object,Object> c;
String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
if (manager == null) {
return null;
throw new CacheExistException();
}
- // Sanity check to avoid contrasting parameters
- if (cMode.containsAll(EnumSet.of(
- IClusterServices.cacheMode.NON_TRANSACTIONAL,
+ // Sanity check to avoid contrasting parameters between transactional
+ // and not
+ if (cMode.containsAll(EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL,
IClusterServices.cacheMode.TRANSACTIONAL))) {
throw new CacheConfigException();
}
- if (cMode.contains(IClusterServices.cacheMode.NON_TRANSACTIONAL)) {
- c = manager.getCache(realCacheName);
- return c;
- } else if (cMode.contains(IClusterServices.cacheMode.TRANSACTIONAL)) {
- Configuration rc = manager
- .getCacheConfiguration("transactional-type");
- manager.defineConfiguration(realCacheName, rc);
- c = manager.getCache(realCacheName);
- return c;
+ // Sanity check to avoid contrasting parameters between sync and async
+ if (cMode.containsAll(EnumSet.of(IClusterServices.cacheMode.SYNC, IClusterServices.cacheMode.ASYNC))) {
+ throw new CacheConfigException();
}
- return null;
+
+ Configuration fromTemplateConfig = null;
+ /*
+ * Fetch transactional/non-transactional templates
+ */
+ // Check if transactional
+ if (cMode.contains(IClusterServices.cacheMode.TRANSACTIONAL)) {
+ fromTemplateConfig = manager.getCacheConfiguration("transactional-type");
+ } else if (cMode.contains(IClusterServices.cacheMode.NON_TRANSACTIONAL)) {
+ fromTemplateConfig = manager.getDefaultCacheConfiguration();
+ }
+
+ // If none set the transactional property then just return null
+ if (fromTemplateConfig == null) {
+ return null;
+ }
+
+ ConfigurationBuilder builder = new ConfigurationBuilder();
+ builder.read(fromTemplateConfig);
+ /*
+ * Now evaluate async/sync
+ */
+ if (cMode.contains(IClusterServices.cacheMode.ASYNC)) {
+ builder.clustering()
+ .cacheMode(fromTemplateConfig.clustering()
+ .cacheMode()
+ .toAsync());
+ } else if (cMode.contains(IClusterServices.cacheMode.SYNC)) {
+ builder.clustering()
+ .cacheMode(fromTemplateConfig.clustering()
+ .cacheMode()
+ .toSync());
+ }
+
+ manager.defineConfiguration(realCacheName, builder.build());
+ c = manager.getCache(realCacheName);
+ return c;
}
@Override
public ConcurrentMap<?, ?> getCache(String containerName, String cacheName) {
EmbeddedCacheManager manager = this.cm;
- Cache c;
+ Cache<Object,Object> c;
String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
if (manager == null) {
return null;
@Override
public boolean existCache(String containerName, String cacheName) {
EmbeddedCacheManager manager = this.cm;
- String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
+
if (manager == null) {
return false;
}
+
+ String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
return manager.cacheExists(realCacheName);
}
@Override
public Set<String> getCacheList(String containerName) {
- Set<String> perContainerCaches = new HashSet();
+ Set<String> perContainerCaches = new HashSet<String>();
EmbeddedCacheManager manager = this.cm;
if (manager == null) {
return null;
}
for (String cacheName : manager.getCacheNames()) {
+ if (!manager.isRunning(cacheName)) continue;
if (cacheName.startsWith("{" + containerName + "}_")) {
String[] res = cacheName.split("[{}]");
if (res.length >= 4 && res[1].equals(containerName)
public void addListener(String containerName, String cacheName,
IGetUpdates<?, ?> u) throws CacheListenerAddException {
EmbeddedCacheManager manager = this.cm;
- Cache c;
+ Cache<Object,Object> c;
String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
if (manager == null) {
return;
public Set<IGetUpdates<?, ?>> getListeners(String containerName,
String cacheName) {
EmbeddedCacheManager manager = this.cm;
- Cache c;
+ Cache<Object,Object> c;
String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
if (manager == null) {
return null;
}
c = manager.getCache(realCacheName);
- Set<IGetUpdates<?, ?>> res = new HashSet();
+ Set<IGetUpdates<?, ?>> res = new HashSet<IGetUpdates<?, ?>>();
Set<Object> listeners = c.getListeners();
for (Object listener : listeners) {
if (listener instanceof CacheListenerContainer) {
public void removeListener(String containerName, String cacheName,
IGetUpdates<?, ?> u) {
EmbeddedCacheManager manager = this.cm;
- Cache c;
+ Cache<Object,Object> c;
String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
if (manager == null) {
return;
@Override
public void tbegin() throws NotSupportedException, SystemException {
+ // call tbegin with the default timeout
+ tbegin(DEFAULT_TRANSACTION_TIMEOUT, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void tbegin(long timeout, TimeUnit unit) throws NotSupportedException, SystemException {
EmbeddedCacheManager manager = this.cm;
if (manager == null) {
throw new IllegalStateException();
if (tm == null) {
throw new IllegalStateException();
}
+ long timeoutSec = unit.toSeconds(timeout);
+ if((timeoutSec > Integer.MAX_VALUE) || (timeoutSec <= 0)) {
+ // fall back to the default timeout
+ tm.setTransactionTimeout(DEFAULT_TRANSACTION_TIMEOUT);
+ } else {
+ // cast is ok here
+ // as here we are sure that timeoutSec < = Integer.MAX_VALUE.
+ tm.setTransactionTimeout((int) timeoutSec);
+ }
tm.begin();
}
EmbeddedCacheManager manager = this.cm;
if (manager == null) {
// In case we cannot fetch the information, lets assume we
- // are standby, so to have less responsability.
+ // are standby, so to have less responsibility.
return true;
}
return (!manager.isCoordinator());
EmbeddedCacheManager manager = this.cm;
if ((manager == null) || (a == null)) {
// In case we cannot fetch the information, lets assume we
- // are standby, so to have less responsability.
+ // are standby, so to have less responsibility.
return null;
}
Transport t = manager.getTransport();
return null;
}
+ @Override
public List<InetAddress> getClusteredControllers() {
EmbeddedCacheManager manager = this.cm;
if (manager == null) {
return null;
}
List<Address> controllers = manager.getMembers();
- if ((controllers == null) || controllers.size() == 0)
+ if ((controllers == null) || controllers.size() == 0) {
return null;
+ }
List<InetAddress> clusteredControllers = new ArrayList<InetAddress>();
for (Address a : controllers) {
InetAddress inetAddress = addressToInetAddress(a);
if (inetAddress != null
- && !inetAddress.getHostAddress().equals(loopbackAddress))
+ && !inetAddress.getHostAddress().equals(loopbackAddress)) {
clusteredControllers.add(inetAddress);
+ }
}
return clusteredControllers;
}
+ @Override
public InetAddress getMyAddress() {
EmbeddedCacheManager manager = this.cm;
if (manager == null) {
EmbeddedCacheManager manager = this.cm;
if (manager == null) {
// In case we cannot fetch the information, lets assume we
- // are standby, so to have less responsability.
+ // are standby, so to have less responsibility.
return null;
}
EmbeddedCacheManager manager = this.cm;
if (manager == null) {
// In case we cannot fetch the information, lets assume we
- // are standby, so to have less responsability.
+ // are standby, so to have less responsibility.
throw new ListenRoleChangeAddException();
}
if (this.roleChangeListeners == null) {
- this.roleChangeListeners = new HashSet();
+ this.roleChangeListeners = new HashSet<IListenRoleChange>();
this.cacheManagerListener = new ViewChangedListener(
this.roleChangeListeners);
manager.addListener(this.cacheManagerListener);
EmbeddedCacheManager manager = this.cm;
if (manager == null) {
// In case we cannot fetch the information, lets assume we
- // are standby, so to have less responsability.
+ // are standby, so to have less responsibility.
return;
}