Merge "Removing the Container Aware dependency from Clustering Services."
[controller.git] / opendaylight / clustering / services_implementation / src / main / java / org / opendaylight / controller / clustering / services_implementation / internal / ClusterManager.java
1
2 /*
3  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9
10 package org.opendaylight.controller.clustering.services_implementation.internal;
11
12 import java.net.InetAddress;
13 import java.net.NetworkInterface;
14 import java.net.SocketException;
15 import java.net.UnknownHostException;
16 import java.util.ArrayList;
17 import java.util.EnumSet;
18 import java.util.Enumeration;
19 import java.util.HashSet;
20 import java.util.List;
21 import java.util.Properties;
22 import java.util.Set;
23 import java.util.StringTokenizer;
24 import java.util.concurrent.ConcurrentMap;
25
26 import javax.transaction.HeuristicMixedException;
27 import javax.transaction.HeuristicRollbackException;
28 import javax.transaction.NotSupportedException;
29 import javax.transaction.RollbackException;
30 import javax.transaction.SystemException;
31 import javax.transaction.Transaction;
32 import javax.transaction.TransactionManager;
33
34 import org.infinispan.Cache;
35 import org.infinispan.configuration.cache.Configuration;
36 import org.infinispan.configuration.global.GlobalConfigurationBuilder;
37 import org.infinispan.configuration.parsing.ConfigurationBuilderHolder;
38 import org.infinispan.configuration.parsing.ParserRegistry;
39 import org.infinispan.manager.DefaultCacheManager;
40 import org.infinispan.manager.EmbeddedCacheManager;
41 import org.infinispan.notifications.Listener;
42 import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
43 import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
44 import org.infinispan.remoting.transport.Address;
45 import org.infinispan.remoting.transport.Transport;
46 import org.infinispan.remoting.transport.jgroups.JGroupsAddress;
47 import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
48 import org.jgroups.Channel;
49 import org.jgroups.Event;
50 import org.jgroups.stack.GossipRouter;
51 import org.opendaylight.controller.clustering.services.CacheConfigException;
52 import org.opendaylight.controller.clustering.services.CacheExistException;
53 import org.opendaylight.controller.clustering.services.CacheListenerAddException;
54 import org.opendaylight.controller.clustering.services.IClusterServices;
55 import org.opendaylight.controller.clustering.services.IGetUpdates;
56 import org.opendaylight.controller.clustering.services.IListenRoleChange;
57 import org.opendaylight.controller.clustering.services.ListenRoleChangeAddException;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60
61 public class ClusterManager implements IClusterServices {
62     protected static final Logger logger = LoggerFactory
63             .getLogger(ClusterManager.class);
64     private DefaultCacheManager cm;
65     GossipRouter gossiper;
66     private HashSet<IListenRoleChange> roleChangeListeners;
67     private ViewChangedListener cacheManagerListener;
68
69     private static String loopbackAddress = "127.0.0.1";
70
71     /**
72      * Start a JGroups GossipRouter if we are a supernode. The
73      * GosispRouter is nothing more than a simple
74      * rendevouz-pointer. All the nodes that wants to join the cluster
75      * will come to any of the rendevouz point and they introduce the
76      * nodes to all the others. Once the meet and greet phase if over,
77      * the nodes will open a full-mesh with the remaining n-1 nodes,
78      * so even if the GossipRouter goes down nothing is lost.
79      * NOTE: This function has the side effect to set some of the
80      * JGROUPS configurations, this because in this function already
81      * we try to retrieve some of the network capabilities of the
82      * device and so it's better not to do that again
83      *
84      *
85      * @return GossipRouter
86      */
87     private GossipRouter startGossiper() {
88         boolean amIGossipRouter = false;
89         Integer gossipRouterPortDefault = 12001;
90         Integer gossipRouterPort = gossipRouterPortDefault;
91         InetAddress gossipRouterAddress = null;
92         String supernodes_list = System.getProperty("supernodes",
93                 loopbackAddress);
94         StringBuffer sanitized_supernodes_list = new StringBuffer();
95         List<InetAddress> myAddresses = new ArrayList<InetAddress>();
96
97         StringTokenizer supernodes = new StringTokenizer(supernodes_list, ":");
98         if (supernodes.hasMoreTokens()) {
99             // Populate the list of my addresses
100             try {
101                 Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
102                 while (e.hasMoreElements()) {
103                     NetworkInterface n = e.nextElement();
104                     Enumeration<InetAddress> ee = n.getInetAddresses();
105                     while (ee.hasMoreElements()) {
106                         InetAddress i = ee.nextElement();
107                         myAddresses.add(i);
108                     }
109                 }
110             } catch (SocketException se) {
111                 logger.error("Cannot get the list of network interfaces");
112                 return null;
113             }
114         }
115         while (supernodes.hasMoreTokens()) {
116             String curr_supernode = supernodes.nextToken();
117             logger.debug("Examining supernode {}", curr_supernode);
118             StringTokenizer host_port = new StringTokenizer(curr_supernode,
119                     "[]");
120             String host;
121             String port;
122             Integer port_num = gossipRouterPortDefault;
123             if (host_port.countTokens() > 2) {
124                 logger.error("Error parsing supernode {} proceed to the next one",
125                         curr_supernode);
126                 continue;
127             }
128             host = host_port.nextToken();
129             InetAddress hostAddr;
130             try {
131                 hostAddr = InetAddress.getByName(host);
132             } catch (UnknownHostException ue) {
133                 logger.error("Host not known");
134                 continue;
135             }
136             if (host_port.hasMoreTokens()) {
137                 port = host_port.nextToken();
138                 try {
139                     port_num = Integer.valueOf(port);
140                 } catch (NumberFormatException ne) {
141                     logger
142                             .error("Supplied supernode gossiepr port is not recognized, using standard gossipport");
143                     port_num = gossipRouterPortDefault;
144                 }
145                 if ((port_num > 65535) || (port_num < 0)) {
146                     logger
147                             .error("Supplied supernode gossip port is outside a valid TCP port range");
148                     port_num = gossipRouterPortDefault;
149                 }
150             }
151             if (!amIGossipRouter) {
152                 if (host != null) {
153                     for (InetAddress myAddr : myAddresses) {
154                         if (myAddr.equals(hostAddr)) {
155                             amIGossipRouter = true;
156                             gossipRouterAddress = hostAddr;
157                             gossipRouterPort = port_num;
158                             break;
159                         }
160                     }
161                 }
162             }
163             if (!sanitized_supernodes_list.toString().equals("")) {
164                 sanitized_supernodes_list.append(",");
165             }
166             sanitized_supernodes_list.append(hostAddr.getHostAddress() + "["
167                     + port_num + "]");
168         }
169
170         if (amIGossipRouter) {
171             // Set the Jgroups binding interface to the one we got
172             // from the supernodes attribute
173             if (gossipRouterAddress != null) {
174                 System.setProperty("jgroups.tcp.address", gossipRouterAddress
175                         .getHostAddress());
176             }
177         } else {
178             // Set the Jgroup binding interface to the one we are well
179             // known outside or else to the first with non-local
180             // scope.
181             try {
182                 String myBind = InetAddress.getLocalHost().getHostAddress();
183                 if (myBind == null
184                         || InetAddress.getLocalHost().isLoopbackAddress()) {
185                     for (InetAddress myAddr : myAddresses) {
186                         if (myAddr.isLoopbackAddress()
187                                 || myAddr.isLinkLocalAddress()) {
188                             logger.debug("Skipping local address {}",
189                                          myAddr.getHostAddress());
190                             continue;
191                         } else {
192                             // First non-local address
193                             myBind = myAddr.getHostAddress();
194                             logger.debug("First non-local address {}", myBind);
195                             break;
196                         }
197                     }
198                 }
199                 String jgroupAddress = System
200                         .getProperty("jgroups.tcp.address");
201                 if (jgroupAddress == null) {
202                     if (myBind != null) {
203                         logger.debug("Set bind address to be {}", myBind);
204                         System.setProperty("jgroups.tcp.address", myBind);
205                     } else {
206                         logger
207                                 .debug("Set bind address to be LOCALHOST=127.0.0.1");
208                         System.setProperty("jgroups.tcp.address", "127.0.0.1");
209                     }
210                 } else {
211                     logger.debug("jgroup.tcp.address already set to be {}",
212                             jgroupAddress);
213                 }
214             } catch (UnknownHostException uhe) {
215                 logger
216                         .error("Met UnknownHostException while trying to get binding address for jgroups");
217             }
218         }
219
220         // The supernodes list constitute also the tcpgossip initial
221         // host list
222         System.setProperty("jgroups.tcpgossip.initial_hosts",
223                 sanitized_supernodes_list.toString());
224         logger.debug("jgroups.tcp.address set to {}",
225                 System.getProperty("jgroups.tcp.address"));
226         logger.debug("jgroups.tcpgossip.initial_hosts set to {}",
227                 System.getProperty("jgroups.tcpgossip.initial_hosts"));
228         GossipRouter res = null;
229         if (amIGossipRouter) {
230             logger.info("I'm a GossipRouter will listen on port {}",
231                     gossipRouterPort);
232             // Start a GossipRouter with JMX support
233             res = new GossipRouter(gossipRouterPort, null, true);
234         }
235         return res;
236     }
237
238     public void start() {
239         this.gossiper = startGossiper();
240         if (this.gossiper != null) {
241             logger.debug("Trying to start Gossiper");
242             try {
243                 this.gossiper.start();
244                 logger.info("Started GossipRouter");
245             } catch (Exception e) {
246                 logger.error("GossipRouter didn't start. Exception Stack Trace",
247                              e);
248             }
249         }
250         logger.info("Starting the ClusterManager");
251         try {
252             ParserRegistry parser = new ParserRegistry(this.getClass()
253                     .getClassLoader());
254             String infinispanConfigFile =
255                     System.getProperty("org.infinispan.config.file", "config/infinispan-config.xml");
256             logger.debug("Using configuration file:{}", infinispanConfigFile);
257             ConfigurationBuilderHolder holder = parser.parseFile(infinispanConfigFile);
258             GlobalConfigurationBuilder globalBuilder = holder.getGlobalConfigurationBuilder();
259             globalBuilder.serialization()
260                     .classResolver(new ClassResolver())
261                     .build();
262             this.cm = new DefaultCacheManager(holder, false);
263             logger.debug("Allocated ClusterManager");
264             if (this.cm != null) {
265                 this.cm.start();
266                 this.cm.startCache();
267                 logger.debug("Started the ClusterManager");
268             }
269         } catch (Exception ioe) {
270             logger.error("Cannot configure infinispan .. bailing out ");
271             logger.error("Stack Trace that raised th exception");
272             logger.error("",ioe);
273             this.cm = null;
274             this.stop();
275         }
276         logger.debug("Cache Manager has value {}", this.cm);
277     }
278
279     public void stop() {
280         logger.info("Stopping the ClusterManager");
281         if (this.cm != null) {
282             logger.info("Found a valid ClusterManager, now let it be stopped");
283             this.cm.stop();
284             this.cm = null;
285         }
286         if (this.gossiper != null) {
287             this.gossiper.stop();
288             this.gossiper = null;
289         }
290     }
291
292     @Override
293     public ConcurrentMap<?, ?> createCache(String containerName,
294             String cacheName, Set<cacheMode> cMode) throws CacheExistException,
295             CacheConfigException {
296         EmbeddedCacheManager manager = this.cm;
297         Cache<Object,Object> c;
298         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
299         if (manager == null) {
300             return null;
301         }
302
303         if (manager.cacheExists(realCacheName)) {
304             throw new CacheExistException();
305         }
306
307         // Sanity check to avoid contrasting parameters
308         if (cMode.containsAll(EnumSet.of(
309                 IClusterServices.cacheMode.NON_TRANSACTIONAL,
310                 IClusterServices.cacheMode.TRANSACTIONAL))) {
311             throw new CacheConfigException();
312         }
313
314         if (cMode.contains(IClusterServices.cacheMode.NON_TRANSACTIONAL)) {
315             c = manager.getCache(realCacheName);
316             return c;
317         } else if (cMode.contains(IClusterServices.cacheMode.TRANSACTIONAL)) {
318             Configuration rc = manager
319                     .getCacheConfiguration("transactional-type");
320             manager.defineConfiguration(realCacheName, rc);
321             c = manager.getCache(realCacheName);
322             return c;
323         }
324         return null;
325     }
326
327     @Override
328     public ConcurrentMap<?, ?> getCache(String containerName, String cacheName) {
329         EmbeddedCacheManager manager = this.cm;
330         Cache<Object,Object> c;
331         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
332         if (manager == null) {
333             return null;
334         }
335
336         if (manager.cacheExists(realCacheName)) {
337             c = manager.getCache(realCacheName);
338             return c;
339         }
340         return null;
341     }
342
343     @Override
344     public void destroyCache(String containerName, String cacheName) {
345         EmbeddedCacheManager manager = this.cm;
346         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
347         if (manager == null) {
348             return;
349         }
350         if (manager.cacheExists(realCacheName)) {
351             manager.removeCache(realCacheName);
352         }
353     }
354
355     @Override
356     public boolean existCache(String containerName, String cacheName) {
357         EmbeddedCacheManager manager = this.cm;
358         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
359         if (manager == null) {
360             return false;
361         }
362         return manager.cacheExists(realCacheName);
363     }
364
365     @Override
366     public Set<String> getCacheList(String containerName) {
367         Set<String> perContainerCaches = new HashSet<String>();
368         EmbeddedCacheManager manager = this.cm;
369         if (manager == null) {
370             return null;
371         }
372         for (String cacheName : manager.getCacheNames()) {
373             if (!manager.isRunning(cacheName)) continue;
374             if (cacheName.startsWith("{" + containerName + "}_")) {
375                 String[] res = cacheName.split("[{}]");
376                 if (res.length >= 4 && res[1].equals(containerName)
377                         && res[2].equals("_")) {
378                     perContainerCaches.add(res[3]);
379                 }
380             }
381         }
382
383         return (perContainerCaches);
384     }
385
386     @Override
387     public Properties getCacheProperties(String containerName, String cacheName) {
388         EmbeddedCacheManager manager = this.cm;
389         if (manager == null) {
390             return null;
391         }
392         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
393         if (!manager.cacheExists(realCacheName)) {
394             return null;
395         }
396         Configuration conf = manager.getCache(realCacheName).getAdvancedCache()
397                 .getCacheConfiguration();
398         Properties p = new Properties();
399         p.setProperty(IClusterServices.cacheProps.TRANSACTION_PROP.toString(),
400                 conf.transaction().toString());
401         p.setProperty(IClusterServices.cacheProps.CLUSTERING_PROP.toString(),
402                 conf.clustering().toString());
403         p.setProperty(IClusterServices.cacheProps.LOCKING_PROP.toString(), conf
404                 .locking().toString());
405         return p;
406     }
407
408     @Override
409     public void addListener(String containerName, String cacheName,
410             IGetUpdates<?, ?> u) throws CacheListenerAddException {
411         EmbeddedCacheManager manager = this.cm;
412         Cache<Object,Object> c;
413         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
414         if (manager == null) {
415             return;
416         }
417
418         if (!manager.cacheExists(realCacheName)) {
419             throw new CacheListenerAddException();
420         }
421         c = manager.getCache(realCacheName);
422         CacheListenerContainer cl = new CacheListenerContainer(u,
423                 containerName, cacheName);
424         c.addListener(cl);
425     }
426
427     @Override
428     public Set<IGetUpdates<?, ?>> getListeners(String containerName,
429             String cacheName) {
430         EmbeddedCacheManager manager = this.cm;
431         Cache<Object,Object> c;
432         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
433         if (manager == null) {
434             return null;
435         }
436
437         if (!manager.cacheExists(realCacheName)) {
438             return null;
439         }
440         c = manager.getCache(realCacheName);
441
442         Set<IGetUpdates<?, ?>> res = new HashSet<IGetUpdates<?, ?>>();
443         Set<Object> listeners = c.getListeners();
444         for (Object listener : listeners) {
445             if (listener instanceof CacheListenerContainer) {
446                 CacheListenerContainer cl = (CacheListenerContainer) listener;
447                 res.add(cl.whichListener());
448             }
449         }
450
451         return res;
452     }
453
454     @Override
455     public void removeListener(String containerName, String cacheName,
456             IGetUpdates<?, ?> u) {
457         EmbeddedCacheManager manager = this.cm;
458         Cache<Object,Object> c;
459         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
460         if (manager == null) {
461             return;
462         }
463
464         if (!manager.cacheExists(realCacheName)) {
465             return;
466         }
467         c = manager.getCache(realCacheName);
468
469         Set<Object> listeners = c.getListeners();
470         for (Object listener : listeners) {
471             if (listener instanceof CacheListenerContainer) {
472                 CacheListenerContainer cl = (CacheListenerContainer) listener;
473                 if (cl.whichListener() == u) {
474                     c.removeListener(listener);
475                     return;
476                 }
477             }
478         }
479     }
480
481     @Override
482     public void tbegin() throws NotSupportedException, SystemException {
483         EmbeddedCacheManager manager = this.cm;
484         if (manager == null) {
485             throw new IllegalStateException();
486         }
487         TransactionManager tm = manager.getCache("transactional-type")
488                 .getAdvancedCache().getTransactionManager();
489         if (tm == null) {
490             throw new IllegalStateException();
491         }
492         tm.begin();
493     }
494
495     @Override
496     public void tcommit() throws RollbackException, HeuristicMixedException,
497             HeuristicRollbackException, java.lang.SecurityException,
498             java.lang.IllegalStateException, SystemException {
499         EmbeddedCacheManager manager = this.cm;
500         if (manager == null) {
501             throw new IllegalStateException();
502         }
503         TransactionManager tm = manager.getCache("transactional-type")
504                 .getAdvancedCache().getTransactionManager();
505         if (tm == null) {
506             throw new IllegalStateException();
507         }
508         tm.commit();
509     }
510
511     @Override
512     public void trollback() throws java.lang.IllegalStateException,
513             java.lang.SecurityException, SystemException {
514         EmbeddedCacheManager manager = this.cm;
515         if (manager == null) {
516             throw new IllegalStateException();
517         }
518         TransactionManager tm = manager.getCache("transactional-type")
519                 .getAdvancedCache().getTransactionManager();
520         if (tm == null) {
521             throw new IllegalStateException();
522         }
523         tm.rollback();
524     }
525
526     @Override
527     public Transaction tgetTransaction() throws SystemException {
528         EmbeddedCacheManager manager = this.cm;
529         if (manager == null) {
530             throw new IllegalStateException();
531         }
532         TransactionManager tm = manager.getCache("transactional-type")
533                 .getAdvancedCache().getTransactionManager();
534         if (tm == null) {
535             return null;
536         }
537         return tm.getTransaction();
538     }
539
540     @Override
541     public boolean amIStandby() {
542         EmbeddedCacheManager manager = this.cm;
543         if (manager == null) {
544             // In case we cannot fetch the information, lets assume we
545             // are standby, so to have less responsibility.
546             return true;
547         }
548         return (!manager.isCoordinator());
549     }
550
551     private InetAddress addressToInetAddress(Address a) {
552         EmbeddedCacheManager manager = this.cm;
553         if ((manager == null) || (a == null)) {
554             // In case we cannot fetch the information, lets assume we
555             // are standby, so to have less responsibility.
556             return null;
557         }
558         Transport t = manager.getTransport();
559         if (t instanceof JGroupsTransport) {
560             JGroupsTransport jt = (JGroupsTransport) t;
561             Channel c = jt.getChannel();
562             if (a instanceof JGroupsAddress) {
563                 JGroupsAddress ja = (JGroupsAddress) a;
564                 org.jgroups.Address phys = (org.jgroups.Address) c
565                         .down(new Event(Event.GET_PHYSICAL_ADDRESS, ja
566                                 .getJGroupsAddress()));
567                 if (phys instanceof org.jgroups.stack.IpAddress) {
568                     InetAddress bindAddress = ((org.jgroups.stack.IpAddress) phys)
569                             .getIpAddress();
570                     return bindAddress;
571                 }
572             }
573         }
574         return null;
575     }
576
577     @Override
578     public List<InetAddress> getClusteredControllers() {
579         EmbeddedCacheManager manager = this.cm;
580         if (manager == null) {
581             return null;
582         }
583         List<Address> controllers = manager.getMembers();
584         if ((controllers == null) || controllers.size() == 0) {
585             return null;
586         }
587
588         List<InetAddress> clusteredControllers = new ArrayList<InetAddress>();
589         for (Address a : controllers) {
590             InetAddress inetAddress = addressToInetAddress(a);
591             if (inetAddress != null
592                     && !inetAddress.getHostAddress().equals(loopbackAddress)) {
593                 clusteredControllers.add(inetAddress);
594             }
595         }
596         return clusteredControllers;
597     }
598
599     @Override
600     public InetAddress getMyAddress() {
601         EmbeddedCacheManager manager = this.cm;
602         if (manager == null) {
603             return null;
604         }
605         return addressToInetAddress(manager.getAddress());
606     }
607
608     @Override
609     public InetAddress getActiveAddress() {
610         EmbeddedCacheManager manager = this.cm;
611         if (manager == null) {
612             // In case we cannot fetch the information, lets assume we
613             // are standby, so to have less responsibility.
614             return null;
615         }
616
617         return addressToInetAddress(manager.getCoordinator());
618     }
619
620     @Override
621     public void listenRoleChange(IListenRoleChange i)
622             throws ListenRoleChangeAddException {
623         EmbeddedCacheManager manager = this.cm;
624         if (manager == null) {
625             // In case we cannot fetch the information, lets assume we
626             // are standby, so to have less responsibility.
627             throw new ListenRoleChangeAddException();
628         }
629
630         if (this.roleChangeListeners == null) {
631             this.roleChangeListeners = new HashSet<IListenRoleChange>();
632             this.cacheManagerListener = new ViewChangedListener(
633                     this.roleChangeListeners);
634             manager.addListener(this.cacheManagerListener);
635         }
636
637         if (this.roleChangeListeners != null) {
638             this.roleChangeListeners.add(i);
639         }
640     }
641
642     @Override
643     public void unlistenRoleChange(IListenRoleChange i) {
644         EmbeddedCacheManager manager = this.cm;
645         if (manager == null) {
646             // In case we cannot fetch the information, lets assume we
647             // are standby, so to have less responsibility.
648             return;
649         }
650
651         if (this.roleChangeListeners != null) {
652             this.roleChangeListeners.remove(i);
653         }
654
655         if ((this.roleChangeListeners != null && this.roleChangeListeners
656                 .isEmpty())
657                 && (this.cacheManagerListener != null)) {
658             manager.removeListener(this.cacheManagerListener);
659             this.cacheManagerListener = null;
660             this.roleChangeListeners = null;
661         }
662     }
663
664     @Listener
665     public class ViewChangedListener {
666         Set<IListenRoleChange> roleListeners;
667
668         public ViewChangedListener(Set<IListenRoleChange> s) {
669             this.roleListeners = s;
670         }
671
672         @ViewChanged
673         public void viewChanged(ViewChangedEvent e) {
674             for (IListenRoleChange i : this.roleListeners) {
675                 i.newActiveAvailable();
676             }
677         }
678     }
679 }