3 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
10 package org.opendaylight.controller.clustering.services_implementation.internal;
12 import java.net.InetAddress;
13 import java.net.NetworkInterface;
14 import java.net.SocketException;
15 import java.net.UnknownHostException;
16 import java.util.ArrayList;
17 import java.util.EnumSet;
18 import java.util.Enumeration;
19 import java.util.HashSet;
20 import java.util.List;
21 import java.util.Properties;
23 import java.util.StringTokenizer;
24 import java.util.concurrent.ConcurrentMap;
26 import javax.transaction.HeuristicMixedException;
27 import javax.transaction.HeuristicRollbackException;
28 import javax.transaction.NotSupportedException;
29 import javax.transaction.RollbackException;
30 import javax.transaction.SystemException;
31 import javax.transaction.Transaction;
32 import javax.transaction.TransactionManager;
34 import org.infinispan.Cache;
35 import org.infinispan.configuration.cache.Configuration;
36 import org.infinispan.manager.DefaultCacheManager;
37 import org.infinispan.manager.EmbeddedCacheManager;
38 import org.infinispan.notifications.Listener;
39 import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
40 import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
41 import org.infinispan.remoting.transport.Address;
42 import org.infinispan.remoting.transport.Transport;
43 import org.infinispan.remoting.transport.jgroups.JGroupsAddress;
44 import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
45 import org.jgroups.Channel;
46 import org.jgroups.Event;
47 import org.jgroups.stack.GossipRouter;
48 import org.opendaylight.controller.clustering.services.CacheConfigException;
49 import org.opendaylight.controller.clustering.services.CacheExistException;
50 import org.opendaylight.controller.clustering.services.CacheListenerAddException;
51 import org.opendaylight.controller.clustering.services.IClusterServices;
52 import org.opendaylight.controller.clustering.services.IGetUpdates;
53 import org.opendaylight.controller.clustering.services.IListenRoleChange;
54 import org.opendaylight.controller.clustering.services.ListenRoleChangeAddException;
55 import org.opendaylight.controller.sal.core.IContainerAware;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
59 public class ClusterManager implements IClusterServices, IContainerAware {
60 protected static final Logger logger = LoggerFactory
61 .getLogger(ClusterManager.class);
62 private DefaultCacheManager cm;
63 GossipRouter gossiper;
64 private HashSet<IListenRoleChange> roleChangeListeners;
65 private ViewChangedListener cacheManagerListener;
67 private static String loopbackAddress = "127.0.0.1";
70 * Start a JGroups GossipRouter if we are a supernode. The
71 * GosispRouter is nothing more than a simple
72 * rendevouz-pointer. All the nodes that wants to join the cluster
73 * will come to any of the rendevouz point and they introduce the
74 * nodes to all the others. Once the meet and greet phase if over,
75 * the nodes will open a full-mesh with the remaining n-1 nodes,
76 * so even if the GossipRouter goes down nothing is lost.
77 * NOTE: This function has the side effect to set some of the
78 * JGROUPS configurations, this because in this function already
79 * we try to retrieve some of the network capabilities of the
80 * device and so it's better not to do that again
83 * @return GossipRouter
85 private GossipRouter startGossiper() {
86 boolean amIGossipRouter = false;
87 Integer gossipRouterPortDefault = 12001;
88 Integer gossipRouterPort = gossipRouterPortDefault;
89 InetAddress gossipRouterAddress = null;
90 String supernodes_list = System.getProperty("supernodes",
92 StringBuffer sanitized_supernodes_list = new StringBuffer();
93 List<InetAddress> myAddresses = new ArrayList<InetAddress>();
95 StringTokenizer supernodes = new StringTokenizer(supernodes_list, ":");
96 if (supernodes.hasMoreTokens()) {
97 // Populate the list of my addresses
99 Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
100 while (e.hasMoreElements()) {
101 NetworkInterface n = e.nextElement();
102 Enumeration<InetAddress> ee = n.getInetAddresses();
103 while (ee.hasMoreElements()) {
104 InetAddress i = ee.nextElement();
108 } catch (SocketException se) {
109 logger.error("Cannot get the list of network interfaces");
113 while (supernodes.hasMoreTokens()) {
114 String curr_supernode = supernodes.nextToken();
115 logger.debug("Examining supernode {}", curr_supernode);
116 StringTokenizer host_port = new StringTokenizer(curr_supernode,
120 Integer port_num = gossipRouterPortDefault;
121 if (host_port.countTokens() > 2) {
122 logger.error("Error parsing supernode {} proceed to the next one",
126 host = host_port.nextToken();
127 InetAddress hostAddr;
129 hostAddr = InetAddress.getByName(host);
130 } catch (UnknownHostException ue) {
131 logger.error("Host not known");
134 if (host_port.hasMoreTokens()) {
135 port = host_port.nextToken();
137 port_num = Integer.valueOf(port);
138 } catch (NumberFormatException ne) {
140 .error("Supplied supernode gossiepr port is not recognized, using standard gossipport");
141 port_num = gossipRouterPortDefault;
143 if ((port_num > 65535) || (port_num < 0)) {
145 .error("Supplied supernode gossip port is outside a valid TCP port range");
146 port_num = gossipRouterPortDefault;
149 if (!amIGossipRouter) {
151 for (InetAddress myAddr : myAddresses) {
152 if (myAddr.equals(hostAddr)) {
153 amIGossipRouter = true;
154 gossipRouterAddress = hostAddr;
155 gossipRouterPort = port_num;
161 if (!sanitized_supernodes_list.toString().equals("")) {
162 sanitized_supernodes_list.append(",");
164 sanitized_supernodes_list.append(hostAddr.getHostAddress() + "["
168 if (amIGossipRouter) {
169 // Set the Jgroups binding interface to the one we got
170 // from the supernodes attribute
171 if (gossipRouterAddress != null) {
172 System.setProperty("jgroups.tcp.address", gossipRouterAddress
176 // Set the Jgroup binding interface to the one we are well
177 // known outside or else to the first with non-local
180 String myBind = InetAddress.getLocalHost().getHostAddress();
182 || InetAddress.getLocalHost().isLoopbackAddress()) {
183 for (InetAddress myAddr : myAddresses) {
184 if (myAddr.isLoopbackAddress()
185 || myAddr.isLinkLocalAddress()) {
186 logger.debug("Skipping local address {}",
187 myAddr.getHostAddress());
190 // First non-local address
191 myBind = myAddr.getHostAddress();
192 logger.debug("First non-local address {}", myBind);
197 String jgroupAddress = System
198 .getProperty("jgroups.tcp.address");
199 if (jgroupAddress == null) {
200 if (myBind != null) {
201 logger.debug("Set bind address to be {}", myBind);
202 System.setProperty("jgroups.tcp.address", myBind);
205 .debug("Set bind address to be LOCALHOST=127.0.0.1");
206 System.setProperty("jgroups.tcp.address", "127.0.0.1");
209 logger.debug("jgroup.tcp.address already set to be {}",
212 } catch (UnknownHostException uhe) {
214 .error("Met UnknownHostException while trying to get binding address for jgroups");
218 // The supernodes list constitute also the tcpgossip initial
220 System.setProperty("jgroups.tcpgossip.initial_hosts",
221 sanitized_supernodes_list.toString());
222 logger.debug("jgroups.tcp.address set to {}",
223 System.getProperty("jgroups.tcp.address"));
224 logger.debug("jgroups.tcpgossip.initial_hosts set to {}",
225 System.getProperty("jgroups.tcpgossip.initial_hosts"));
226 GossipRouter res = null;
227 if (amIGossipRouter) {
228 logger.info("I'm a GossipRouter will listen on port {}",
230 res = new GossipRouter(gossipRouterPort);
235 public void start() {
236 this.gossiper = startGossiper();
237 if (this.gossiper != null) {
238 logger.debug("Trying to start Gossiper");
240 this.gossiper.start();
241 logger.info("Started GossipRouter");
242 } catch (Exception e) {
243 logger.error("GossipRouter didn't start. Exception Stack Trace",
247 logger.info("Starting the ClusterManager");
249 //FIXME keeps throwing FileNotFoundException
250 this.cm = new DefaultCacheManager("config/infinispan-config.xml");
251 logger.debug("Allocated ClusterManager");
252 if (this.cm != null) {
254 this.cm.startCache();
255 logger.debug("Started the ClusterManager");
257 } catch (Exception ioe) {
258 logger.error("Cannot configure infinispan .. bailing out ");
259 logger.error("Stack Trace that raised th exception");
260 logger.error("",ioe);
264 logger.debug("Cache Manager has value {}", this.cm);
268 logger.info("Stopping the ClusterManager");
269 if (this.cm != null) {
270 logger.info("Found a valid ClusterManager, now let it be stopped");
274 if (this.gossiper != null) {
275 this.gossiper.stop();
276 this.gossiper = null;
281 public ConcurrentMap<?, ?> createCache(String containerName,
282 String cacheName, Set<cacheMode> cMode) throws CacheExistException,
283 CacheConfigException {
284 EmbeddedCacheManager manager = this.cm;
285 Cache<Object,Object> c;
286 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
287 if (manager == null) {
291 if (manager.cacheExists(realCacheName)) {
292 throw new CacheExistException();
295 // Sanity check to avoid contrasting parameters
296 if (cMode.containsAll(EnumSet.of(
297 IClusterServices.cacheMode.NON_TRANSACTIONAL,
298 IClusterServices.cacheMode.TRANSACTIONAL))) {
299 throw new CacheConfigException();
302 if (cMode.contains(IClusterServices.cacheMode.NON_TRANSACTIONAL)) {
303 c = manager.getCache(realCacheName);
305 } else if (cMode.contains(IClusterServices.cacheMode.TRANSACTIONAL)) {
306 Configuration rc = manager
307 .getCacheConfiguration("transactional-type");
308 manager.defineConfiguration(realCacheName, rc);
309 c = manager.getCache(realCacheName);
316 public ConcurrentMap<?, ?> getCache(String containerName, String cacheName) {
317 EmbeddedCacheManager manager = this.cm;
318 Cache<Object,Object> c;
319 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
320 if (manager == null) {
324 if (manager.cacheExists(realCacheName)) {
325 c = manager.getCache(realCacheName);
332 public void destroyCache(String containerName, String cacheName) {
333 EmbeddedCacheManager manager = this.cm;
334 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
335 if (manager == null) {
338 if (manager.cacheExists(realCacheName)) {
339 manager.removeCache(realCacheName);
344 public boolean existCache(String containerName, String cacheName) {
345 EmbeddedCacheManager manager = this.cm;
346 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
347 if (manager == null) {
350 return manager.cacheExists(realCacheName);
354 public Set<String> getCacheList(String containerName) {
355 Set<String> perContainerCaches = new HashSet<String>();
356 EmbeddedCacheManager manager = this.cm;
357 if (manager == null) {
360 for (String cacheName : manager.getCacheNames()) {
361 if (cacheName.startsWith("{" + containerName + "}_")) {
362 String[] res = cacheName.split("[{}]");
363 if (res.length >= 4 && res[1].equals(containerName)
364 && res[2].equals("_")) {
365 perContainerCaches.add(res[3]);
370 return (perContainerCaches);
374 public Properties getCacheProperties(String containerName, String cacheName) {
375 EmbeddedCacheManager manager = this.cm;
376 if (manager == null) {
379 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
380 if (!manager.cacheExists(realCacheName)) {
383 Configuration conf = manager.getCache(realCacheName).getAdvancedCache()
384 .getCacheConfiguration();
385 Properties p = new Properties();
386 p.setProperty(IClusterServices.cacheProps.TRANSACTION_PROP.toString(),
387 conf.transaction().toString());
388 p.setProperty(IClusterServices.cacheProps.CLUSTERING_PROP.toString(),
389 conf.clustering().toString());
390 p.setProperty(IClusterServices.cacheProps.LOCKING_PROP.toString(), conf
391 .locking().toString());
396 public void addListener(String containerName, String cacheName,
397 IGetUpdates<?, ?> u) throws CacheListenerAddException {
398 EmbeddedCacheManager manager = this.cm;
399 Cache<Object,Object> c;
400 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
401 if (manager == null) {
405 if (!manager.cacheExists(realCacheName)) {
406 throw new CacheListenerAddException();
408 c = manager.getCache(realCacheName);
409 CacheListenerContainer cl = new CacheListenerContainer(u,
410 containerName, cacheName);
415 public Set<IGetUpdates<?, ?>> getListeners(String containerName,
417 EmbeddedCacheManager manager = this.cm;
418 Cache<Object,Object> c;
419 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
420 if (manager == null) {
424 if (!manager.cacheExists(realCacheName)) {
427 c = manager.getCache(realCacheName);
429 Set<IGetUpdates<?, ?>> res = new HashSet<IGetUpdates<?, ?>>();
430 Set<Object> listeners = c.getListeners();
431 for (Object listener : listeners) {
432 if (listener instanceof CacheListenerContainer) {
433 CacheListenerContainer cl = (CacheListenerContainer) listener;
434 res.add(cl.whichListener());
442 public void removeListener(String containerName, String cacheName,
443 IGetUpdates<?, ?> u) {
444 EmbeddedCacheManager manager = this.cm;
445 Cache<Object,Object> c;
446 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
447 if (manager == null) {
451 if (!manager.cacheExists(realCacheName)) {
454 c = manager.getCache(realCacheName);
456 Set<Object> listeners = c.getListeners();
457 for (Object listener : listeners) {
458 if (listener instanceof CacheListenerContainer) {
459 CacheListenerContainer cl = (CacheListenerContainer) listener;
460 if (cl.whichListener() == u) {
461 c.removeListener(listener);
469 public void tbegin() throws NotSupportedException, SystemException {
470 EmbeddedCacheManager manager = this.cm;
471 if (manager == null) {
472 throw new IllegalStateException();
474 TransactionManager tm = manager.getCache("transactional-type")
475 .getAdvancedCache().getTransactionManager();
477 throw new IllegalStateException();
483 public void tcommit() throws RollbackException, HeuristicMixedException,
484 HeuristicRollbackException, java.lang.SecurityException,
485 java.lang.IllegalStateException, SystemException {
486 EmbeddedCacheManager manager = this.cm;
487 if (manager == null) {
488 throw new IllegalStateException();
490 TransactionManager tm = manager.getCache("transactional-type")
491 .getAdvancedCache().getTransactionManager();
493 throw new IllegalStateException();
499 public void trollback() throws java.lang.IllegalStateException,
500 java.lang.SecurityException, SystemException {
501 EmbeddedCacheManager manager = this.cm;
502 if (manager == null) {
503 throw new IllegalStateException();
505 TransactionManager tm = manager.getCache("transactional-type")
506 .getAdvancedCache().getTransactionManager();
508 throw new IllegalStateException();
514 public Transaction tgetTransaction() throws SystemException {
515 EmbeddedCacheManager manager = this.cm;
516 if (manager == null) {
517 throw new IllegalStateException();
519 TransactionManager tm = manager.getCache("transactional-type")
520 .getAdvancedCache().getTransactionManager();
524 return tm.getTransaction();
528 public boolean amIStandby() {
529 EmbeddedCacheManager manager = this.cm;
530 if (manager == null) {
531 // In case we cannot fetch the information, lets assume we
532 // are standby, so to have less responsibility.
535 return (!manager.isCoordinator());
538 private InetAddress addressToInetAddress(Address a) {
539 EmbeddedCacheManager manager = this.cm;
540 if ((manager == null) || (a == null)) {
541 // In case we cannot fetch the information, lets assume we
542 // are standby, so to have less responsibility.
545 Transport t = manager.getTransport();
546 if (t instanceof JGroupsTransport) {
547 JGroupsTransport jt = (JGroupsTransport) t;
548 Channel c = jt.getChannel();
549 if (a instanceof JGroupsAddress) {
550 JGroupsAddress ja = (JGroupsAddress) a;
551 org.jgroups.Address phys = (org.jgroups.Address) c
552 .down(new Event(Event.GET_PHYSICAL_ADDRESS, ja
553 .getJGroupsAddress()));
554 if (phys instanceof org.jgroups.stack.IpAddress) {
555 InetAddress bindAddress = ((org.jgroups.stack.IpAddress) phys)
565 public List<InetAddress> getClusteredControllers() {
566 EmbeddedCacheManager manager = this.cm;
567 if (manager == null) {
570 List<Address> controllers = manager.getMembers();
571 if ((controllers == null) || controllers.size() == 0) {
575 List<InetAddress> clusteredControllers = new ArrayList<InetAddress>();
576 for (Address a : controllers) {
577 InetAddress inetAddress = addressToInetAddress(a);
578 if (inetAddress != null
579 && !inetAddress.getHostAddress().equals(loopbackAddress)) {
580 clusteredControllers.add(inetAddress);
583 return clusteredControllers;
587 public InetAddress getMyAddress() {
588 EmbeddedCacheManager manager = this.cm;
589 if (manager == null) {
592 return addressToInetAddress(manager.getAddress());
596 public InetAddress getActiveAddress() {
597 EmbeddedCacheManager manager = this.cm;
598 if (manager == null) {
599 // In case we cannot fetch the information, lets assume we
600 // are standby, so to have less responsibility.
604 return addressToInetAddress(manager.getCoordinator());
608 public void listenRoleChange(IListenRoleChange i)
609 throws ListenRoleChangeAddException {
610 EmbeddedCacheManager manager = this.cm;
611 if (manager == null) {
612 // In case we cannot fetch the information, lets assume we
613 // are standby, so to have less responsibility.
614 throw new ListenRoleChangeAddException();
617 if (this.roleChangeListeners == null) {
618 this.roleChangeListeners = new HashSet<IListenRoleChange>();
619 this.cacheManagerListener = new ViewChangedListener(
620 this.roleChangeListeners);
621 manager.addListener(this.cacheManagerListener);
624 if (this.roleChangeListeners != null) {
625 this.roleChangeListeners.add(i);
630 public void unlistenRoleChange(IListenRoleChange i) {
631 EmbeddedCacheManager manager = this.cm;
632 if (manager == null) {
633 // In case we cannot fetch the information, lets assume we
634 // are standby, so to have less responsibility.
638 if (this.roleChangeListeners != null) {
639 this.roleChangeListeners.remove(i);
642 if ((this.roleChangeListeners != null && this.roleChangeListeners
644 && (this.cacheManagerListener != null)) {
645 manager.removeListener(this.cacheManagerListener);
646 this.cacheManagerListener = null;
647 this.roleChangeListeners = null;
652 public class ViewChangedListener {
653 Set<IListenRoleChange> roleListeners;
655 public ViewChangedListener(Set<IListenRoleChange> s) {
656 this.roleListeners = s;
660 public void viewChanged(ViewChangedEvent e) {
661 for (IListenRoleChange i : this.roleListeners) {
662 i.newActiveAvailable();
667 private void removeContainerCaches(String containerName) {
668 logger.info("Destroying caches for container {}", containerName);
669 for (String cacheName : this.getCacheList(containerName)) {
670 this.destroyCache(containerName, cacheName);
675 public void containerCreate(String arg0) {
680 public void containerDestroy(String container) {
681 removeContainerCaches(container);