3 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
10 package org.opendaylight.controller.clustering.services_implementation.internal;
12 import java.net.InetAddress;
13 import java.net.NetworkInterface;
14 import java.net.SocketException;
15 import java.net.UnknownHostException;
16 import java.util.ArrayList;
17 import java.util.EnumSet;
18 import java.util.Enumeration;
19 import java.util.HashSet;
20 import java.util.List;
21 import java.util.Properties;
23 import java.util.StringTokenizer;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.TimeUnit;
27 import javax.transaction.HeuristicMixedException;
28 import javax.transaction.HeuristicRollbackException;
29 import javax.transaction.NotSupportedException;
30 import javax.transaction.RollbackException;
31 import javax.transaction.SystemException;
32 import javax.transaction.Transaction;
33 import javax.transaction.TransactionManager;
35 import org.infinispan.Cache;
36 import org.infinispan.configuration.cache.Configuration;
37 import org.infinispan.configuration.cache.ConfigurationBuilder;
38 import org.infinispan.configuration.global.GlobalConfigurationBuilder;
39 import org.infinispan.configuration.parsing.ConfigurationBuilderHolder;
40 import org.infinispan.configuration.parsing.ParserRegistry;
41 import org.infinispan.manager.DefaultCacheManager;
42 import org.infinispan.manager.EmbeddedCacheManager;
43 import org.infinispan.notifications.Listener;
44 import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
45 import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
46 import org.infinispan.remoting.transport.Address;
47 import org.infinispan.remoting.transport.Transport;
48 import org.infinispan.remoting.transport.jgroups.JGroupsAddress;
49 import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
50 import org.jgroups.Channel;
51 import org.jgroups.Event;
52 import org.jgroups.stack.GossipRouter;
53 import org.opendaylight.controller.clustering.services.CacheConfigException;
54 import org.opendaylight.controller.clustering.services.CacheExistException;
55 import org.opendaylight.controller.clustering.services.CacheListenerAddException;
56 import org.opendaylight.controller.clustering.services.IClusterServices;
57 import org.opendaylight.controller.clustering.services.IGetUpdates;
58 import org.opendaylight.controller.clustering.services.IListenRoleChange;
59 import org.opendaylight.controller.clustering.services.ListenRoleChangeAddException;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
63 public class ClusterManager implements IClusterServices {
64 protected static final Logger logger = LoggerFactory
65 .getLogger(ClusterManager.class);
66 private DefaultCacheManager cm;
67 GossipRouter gossiper;
68 private HashSet<IListenRoleChange> roleChangeListeners;
69 private ViewChangedListener cacheManagerListener;
71 private static String loopbackAddress = "127.0.0.1";
73 // defaultTransactionTimeout is 60 seconds
74 private static int DEFAULT_TRANSACTION_TIMEOUT = 60;
77 * Start a JGroups GossipRouter if we are a supernode. The
78 * GosispRouter is nothing more than a simple
79 * rendevouz-pointer. All the nodes that wants to join the cluster
80 * will come to any of the rendevouz point and they introduce the
81 * nodes to all the others. Once the meet and greet phase if over,
82 * the nodes will open a full-mesh with the remaining n-1 nodes,
83 * so even if the GossipRouter goes down nothing is lost.
84 * NOTE: This function has the side effect to set some of the
85 * JGROUPS configurations, this because in this function already
86 * we try to retrieve some of the network capabilities of the
87 * device and so it's better not to do that again
90 * @return GossipRouter
92 private GossipRouter startGossiper() {
93 boolean amIGossipRouter = false;
94 Integer gossipRouterPortDefault = 12001;
95 Integer gossipRouterPort = gossipRouterPortDefault;
96 InetAddress gossipRouterAddress = null;
97 String supernodes_list = System.getProperty("supernodes",
99 StringBuffer sanitized_supernodes_list = new StringBuffer();
100 List<InetAddress> myAddresses = new ArrayList<InetAddress>();
102 StringTokenizer supernodes = new StringTokenizer(supernodes_list, ":");
103 if (supernodes.hasMoreTokens()) {
104 // Populate the list of my addresses
106 Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
107 while (e.hasMoreElements()) {
108 NetworkInterface n = e.nextElement();
109 Enumeration<InetAddress> ee = n.getInetAddresses();
110 while (ee.hasMoreElements()) {
111 InetAddress i = ee.nextElement();
115 } catch (SocketException se) {
116 logger.error("Cannot get the list of network interfaces");
120 while (supernodes.hasMoreTokens()) {
121 String curr_supernode = supernodes.nextToken();
122 logger.debug("Examining supernode {}", curr_supernode);
123 StringTokenizer host_port = new StringTokenizer(curr_supernode,
127 Integer port_num = gossipRouterPortDefault;
128 if (host_port.countTokens() > 2) {
129 logger.error("Error parsing supernode {} proceed to the next one",
133 host = host_port.nextToken();
134 InetAddress hostAddr;
136 hostAddr = InetAddress.getByName(host);
137 } catch (UnknownHostException ue) {
138 logger.error("Host not known");
141 if (host_port.hasMoreTokens()) {
142 port = host_port.nextToken();
144 port_num = Integer.valueOf(port);
145 } catch (NumberFormatException ne) {
147 .error("Supplied supernode gossiepr port is not recognized, using standard gossipport");
148 port_num = gossipRouterPortDefault;
150 if ((port_num > 65535) || (port_num < 0)) {
152 .error("Supplied supernode gossip port is outside a valid TCP port range");
153 port_num = gossipRouterPortDefault;
156 if (!amIGossipRouter) {
158 for (InetAddress myAddr : myAddresses) {
159 if (myAddr.equals(hostAddr)) {
160 amIGossipRouter = true;
161 gossipRouterAddress = hostAddr;
162 gossipRouterPort = port_num;
168 if (!sanitized_supernodes_list.toString().equals("")) {
169 sanitized_supernodes_list.append(",");
171 sanitized_supernodes_list.append(hostAddr.getHostAddress() + "["
175 if (amIGossipRouter) {
176 // Set the Jgroups binding interface to the one we got
177 // from the supernodes attribute
178 if (gossipRouterAddress != null) {
179 System.setProperty("jgroups.tcp.address", gossipRouterAddress
183 // Set the Jgroup binding interface to the one we are well
184 // known outside or else to the first with non-local
187 String myBind = InetAddress.getLocalHost().getHostAddress();
189 || InetAddress.getLocalHost().isLoopbackAddress()) {
190 for (InetAddress myAddr : myAddresses) {
191 if (myAddr.isLoopbackAddress()
192 || myAddr.isLinkLocalAddress()) {
193 logger.debug("Skipping local address {}",
194 myAddr.getHostAddress());
197 // First non-local address
198 myBind = myAddr.getHostAddress();
199 logger.debug("First non-local address {}", myBind);
204 String jgroupAddress = System
205 .getProperty("jgroups.tcp.address");
206 if (jgroupAddress == null) {
207 if (myBind != null) {
208 logger.debug("Set bind address to be {}", myBind);
209 System.setProperty("jgroups.tcp.address", myBind);
212 .debug("Set bind address to be LOCALHOST=127.0.0.1");
213 System.setProperty("jgroups.tcp.address", "127.0.0.1");
216 logger.debug("jgroup.tcp.address already set to be {}",
219 } catch (UnknownHostException uhe) {
221 .error("Met UnknownHostException while trying to get binding address for jgroups");
225 // The supernodes list constitute also the tcpgossip initial
227 System.setProperty("jgroups.tcpgossip.initial_hosts",
228 sanitized_supernodes_list.toString());
229 logger.debug("jgroups.tcp.address set to {}",
230 System.getProperty("jgroups.tcp.address"));
231 logger.debug("jgroups.tcpgossip.initial_hosts set to {}",
232 System.getProperty("jgroups.tcpgossip.initial_hosts"));
233 GossipRouter res = null;
234 if (amIGossipRouter) {
235 logger.info("I'm a GossipRouter will listen on port {}",
237 // Start a GossipRouter with JMX support
238 res = new GossipRouter(gossipRouterPort, null, true);
243 private void exitOnSecurityException(Exception ioe) {
244 Throwable cause = ioe.getCause();
245 while (cause != null) {
246 if (cause instanceof java.lang.SecurityException) {
247 logger.error("Failed Cluster authentication. Stopping Controller...");
250 cause = cause.getCause();
254 public void start() {
255 this.gossiper = startGossiper();
256 if (this.gossiper != null) {
257 logger.debug("Trying to start Gossiper");
259 this.gossiper.start();
260 logger.info("Started GossipRouter");
261 } catch (Exception e) {
262 logger.error("GossipRouter didn't start. Exception Stack Trace",
266 logger.info("Starting the ClusterManager");
268 ParserRegistry parser = new ParserRegistry(this.getClass()
270 String infinispanConfigFile =
271 System.getProperty("org.infinispan.config.file", "config/infinispan-config.xml");
272 logger.debug("Using configuration file:{}", infinispanConfigFile);
273 ConfigurationBuilderHolder holder = parser.parseFile(infinispanConfigFile);
274 GlobalConfigurationBuilder globalBuilder = holder.getGlobalConfigurationBuilder();
275 globalBuilder.serialization()
276 .classResolver(new ClassResolver())
278 this.cm = new DefaultCacheManager(holder, false);
279 logger.debug("Allocated ClusterManager");
280 if (this.cm != null) {
282 this.cm.startCache();
283 logger.debug("Started the ClusterManager");
285 } catch (Exception ioe) {
286 logger.error("Cannot configure infinispan .. bailing out ");
287 logger.error("Stack Trace that raised th exception");
288 logger.error("",ioe);
290 exitOnSecurityException(ioe);
293 logger.debug("Cache Manager has value {}", this.cm);
297 logger.info("Stopping the ClusterManager");
298 if (this.cm != null) {
299 logger.info("Found a valid ClusterManager, now let it be stopped");
303 if (this.gossiper != null) {
304 this.gossiper.stop();
305 this.gossiper = null;
310 public ConcurrentMap<?, ?> createCache(String containerName,
311 String cacheName, Set<cacheMode> cMode) throws CacheExistException,
312 CacheConfigException {
313 EmbeddedCacheManager manager = this.cm;
314 Cache<Object,Object> c;
315 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
316 if (manager == null) {
320 if (manager.cacheExists(realCacheName)) {
321 throw new CacheExistException();
324 // Sanity check to avoid contrasting parameters between transactional
326 if (cMode.containsAll(EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL,
327 IClusterServices.cacheMode.TRANSACTIONAL))) {
328 throw new CacheConfigException();
331 // Sanity check to avoid contrasting parameters between sync and async
332 if (cMode.containsAll(EnumSet.of(IClusterServices.cacheMode.SYNC, IClusterServices.cacheMode.ASYNC))) {
333 throw new CacheConfigException();
336 Configuration fromTemplateConfig = null;
338 * Fetch transactional/non-transactional templates
340 // Check if transactional
341 if (cMode.contains(IClusterServices.cacheMode.TRANSACTIONAL)) {
342 fromTemplateConfig = manager.getCacheConfiguration("transactional-type");
343 } else if (cMode.contains(IClusterServices.cacheMode.NON_TRANSACTIONAL)) {
344 fromTemplateConfig = manager.getDefaultCacheConfiguration();
347 // If none set the transactional property then just return null
348 if (fromTemplateConfig == null) {
352 ConfigurationBuilder builder = new ConfigurationBuilder();
353 builder.read(fromTemplateConfig);
355 * Now evaluate async/sync
357 if (cMode.contains(IClusterServices.cacheMode.ASYNC)) {
359 .cacheMode(fromTemplateConfig.clustering()
362 } else if (cMode.contains(IClusterServices.cacheMode.SYNC)) {
364 .cacheMode(fromTemplateConfig.clustering()
369 manager.defineConfiguration(realCacheName, builder.build());
370 c = manager.getCache(realCacheName);
375 public ConcurrentMap<?, ?> getCache(String containerName, String cacheName) {
376 EmbeddedCacheManager manager = this.cm;
377 Cache<Object,Object> c;
378 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
379 if (manager == null) {
383 if (manager.cacheExists(realCacheName)) {
384 c = manager.getCache(realCacheName);
391 public void destroyCache(String containerName, String cacheName) {
392 EmbeddedCacheManager manager = this.cm;
393 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
394 if (manager == null) {
397 if (manager.cacheExists(realCacheName)) {
398 manager.removeCache(realCacheName);
403 public boolean existCache(String containerName, String cacheName) {
404 EmbeddedCacheManager manager = this.cm;
406 if (manager == null) {
410 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
411 return manager.cacheExists(realCacheName);
415 public Set<String> getCacheList(String containerName) {
416 Set<String> perContainerCaches = new HashSet<String>();
417 EmbeddedCacheManager manager = this.cm;
418 if (manager == null) {
421 for (String cacheName : manager.getCacheNames()) {
422 if (!manager.isRunning(cacheName)) continue;
423 if (cacheName.startsWith("{" + containerName + "}_")) {
424 String[] res = cacheName.split("[{}]");
425 if (res.length >= 4 && res[1].equals(containerName)
426 && res[2].equals("_")) {
427 perContainerCaches.add(res[3]);
432 return (perContainerCaches);
436 public Properties getCacheProperties(String containerName, String cacheName) {
437 EmbeddedCacheManager manager = this.cm;
438 if (manager == null) {
441 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
442 if (!manager.cacheExists(realCacheName)) {
445 Configuration conf = manager.getCache(realCacheName).getAdvancedCache()
446 .getCacheConfiguration();
447 Properties p = new Properties();
448 p.setProperty(IClusterServices.cacheProps.TRANSACTION_PROP.toString(),
449 conf.transaction().toString());
450 p.setProperty(IClusterServices.cacheProps.CLUSTERING_PROP.toString(),
451 conf.clustering().toString());
452 p.setProperty(IClusterServices.cacheProps.LOCKING_PROP.toString(), conf
453 .locking().toString());
458 public void addListener(String containerName, String cacheName,
459 IGetUpdates<?, ?> u) throws CacheListenerAddException {
460 EmbeddedCacheManager manager = this.cm;
461 Cache<Object,Object> c;
462 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
463 if (manager == null) {
467 if (!manager.cacheExists(realCacheName)) {
468 throw new CacheListenerAddException();
470 c = manager.getCache(realCacheName);
471 CacheListenerContainer cl = new CacheListenerContainer(u,
472 containerName, cacheName);
477 public Set<IGetUpdates<?, ?>> getListeners(String containerName,
479 EmbeddedCacheManager manager = this.cm;
480 Cache<Object,Object> c;
481 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
482 if (manager == null) {
486 if (!manager.cacheExists(realCacheName)) {
489 c = manager.getCache(realCacheName);
491 Set<IGetUpdates<?, ?>> res = new HashSet<IGetUpdates<?, ?>>();
492 Set<Object> listeners = c.getListeners();
493 for (Object listener : listeners) {
494 if (listener instanceof CacheListenerContainer) {
495 CacheListenerContainer cl = (CacheListenerContainer) listener;
496 res.add(cl.whichListener());
504 public void removeListener(String containerName, String cacheName,
505 IGetUpdates<?, ?> u) {
506 EmbeddedCacheManager manager = this.cm;
507 Cache<Object,Object> c;
508 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
509 if (manager == null) {
513 if (!manager.cacheExists(realCacheName)) {
516 c = manager.getCache(realCacheName);
518 Set<Object> listeners = c.getListeners();
519 for (Object listener : listeners) {
520 if (listener instanceof CacheListenerContainer) {
521 CacheListenerContainer cl = (CacheListenerContainer) listener;
522 if (cl.whichListener() == u) {
523 c.removeListener(listener);
531 public void tbegin() throws NotSupportedException, SystemException {
532 // call tbegin with the default timeout
533 tbegin(DEFAULT_TRANSACTION_TIMEOUT, TimeUnit.SECONDS);
537 public void tbegin(long timeout, TimeUnit unit) throws NotSupportedException, SystemException {
538 EmbeddedCacheManager manager = this.cm;
539 if (manager == null) {
540 throw new IllegalStateException();
542 TransactionManager tm = manager.getCache("transactional-type")
543 .getAdvancedCache().getTransactionManager();
545 throw new IllegalStateException();
547 long timeoutSec = unit.toSeconds(timeout);
548 if((timeoutSec > Integer.MAX_VALUE) || (timeoutSec <= 0)) {
549 // fall back to the default timeout
550 tm.setTransactionTimeout(DEFAULT_TRANSACTION_TIMEOUT);
553 // as here we are sure that timeoutSec < = Integer.MAX_VALUE.
554 tm.setTransactionTimeout((int) timeoutSec);
560 public void tcommit() throws RollbackException, HeuristicMixedException,
561 HeuristicRollbackException, java.lang.SecurityException,
562 java.lang.IllegalStateException, SystemException {
563 EmbeddedCacheManager manager = this.cm;
564 if (manager == null) {
565 throw new IllegalStateException();
567 TransactionManager tm = manager.getCache("transactional-type")
568 .getAdvancedCache().getTransactionManager();
570 throw new IllegalStateException();
576 public void trollback() throws java.lang.IllegalStateException,
577 java.lang.SecurityException, SystemException {
578 EmbeddedCacheManager manager = this.cm;
579 if (manager == null) {
580 throw new IllegalStateException();
582 TransactionManager tm = manager.getCache("transactional-type")
583 .getAdvancedCache().getTransactionManager();
585 throw new IllegalStateException();
591 public Transaction tgetTransaction() throws SystemException {
592 EmbeddedCacheManager manager = this.cm;
593 if (manager == null) {
594 throw new IllegalStateException();
596 TransactionManager tm = manager.getCache("transactional-type")
597 .getAdvancedCache().getTransactionManager();
601 return tm.getTransaction();
605 public boolean amIStandby() {
606 EmbeddedCacheManager manager = this.cm;
607 if (manager == null) {
608 // In case we cannot fetch the information, lets assume we
609 // are standby, so to have less responsibility.
612 return (!manager.isCoordinator());
615 private InetAddress addressToInetAddress(Address a) {
616 EmbeddedCacheManager manager = this.cm;
617 if ((manager == null) || (a == null)) {
618 // In case we cannot fetch the information, lets assume we
619 // are standby, so to have less responsibility.
622 Transport t = manager.getTransport();
623 if (t instanceof JGroupsTransport) {
624 JGroupsTransport jt = (JGroupsTransport) t;
625 Channel c = jt.getChannel();
626 if (a instanceof JGroupsAddress) {
627 JGroupsAddress ja = (JGroupsAddress) a;
628 org.jgroups.Address phys = (org.jgroups.Address) c
629 .down(new Event(Event.GET_PHYSICAL_ADDRESS, ja
630 .getJGroupsAddress()));
631 if (phys instanceof org.jgroups.stack.IpAddress) {
632 InetAddress bindAddress = ((org.jgroups.stack.IpAddress) phys)
642 public List<InetAddress> getClusteredControllers() {
643 EmbeddedCacheManager manager = this.cm;
644 if (manager == null) {
647 List<Address> controllers = manager.getMembers();
648 if ((controllers == null) || controllers.size() == 0) {
652 List<InetAddress> clusteredControllers = new ArrayList<InetAddress>();
653 for (Address a : controllers) {
654 InetAddress inetAddress = addressToInetAddress(a);
655 if (inetAddress != null
656 && !inetAddress.getHostAddress().equals(loopbackAddress)) {
657 clusteredControllers.add(inetAddress);
660 return clusteredControllers;
664 public InetAddress getMyAddress() {
665 EmbeddedCacheManager manager = this.cm;
666 if (manager == null) {
669 return addressToInetAddress(manager.getAddress());
673 public InetAddress getActiveAddress() {
674 EmbeddedCacheManager manager = this.cm;
675 if (manager == null) {
676 // In case we cannot fetch the information, lets assume we
677 // are standby, so to have less responsibility.
681 return addressToInetAddress(manager.getCoordinator());
685 public void listenRoleChange(IListenRoleChange i)
686 throws ListenRoleChangeAddException {
687 EmbeddedCacheManager manager = this.cm;
688 if (manager == null) {
689 // In case we cannot fetch the information, lets assume we
690 // are standby, so to have less responsibility.
691 throw new ListenRoleChangeAddException();
694 if (this.roleChangeListeners == null) {
695 this.roleChangeListeners = new HashSet<IListenRoleChange>();
696 this.cacheManagerListener = new ViewChangedListener(
697 this.roleChangeListeners);
698 manager.addListener(this.cacheManagerListener);
701 if (this.roleChangeListeners != null) {
702 this.roleChangeListeners.add(i);
707 public void unlistenRoleChange(IListenRoleChange i) {
708 EmbeddedCacheManager manager = this.cm;
709 if (manager == null) {
710 // In case we cannot fetch the information, lets assume we
711 // are standby, so to have less responsibility.
715 if (this.roleChangeListeners != null) {
716 this.roleChangeListeners.remove(i);
719 if ((this.roleChangeListeners != null && this.roleChangeListeners
721 && (this.cacheManagerListener != null)) {
722 manager.removeListener(this.cacheManagerListener);
723 this.cacheManagerListener = null;
724 this.roleChangeListeners = null;
729 public class ViewChangedListener {
730 Set<IListenRoleChange> roleListeners;
732 public ViewChangedListener(Set<IListenRoleChange> s) {
733 this.roleListeners = s;
737 public void viewChanged(ViewChangedEvent e) {
738 for (IListenRoleChange i : this.roleListeners) {
739 i.newActiveAvailable();