Initial opendaylight infrastructure commit!!
[controller.git] / opendaylight / clustering / services_implementation / src / main / java / org / opendaylight / controller / clustering / services_implementation / internal / ClusterManager.java
1
2 /*
3  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9
10 package org.opendaylight.controller.clustering.services_implementation.internal;
11
12 import java.io.PrintWriter;
13 import java.io.StringWriter;
14 import java.net.InetAddress;
15 import java.net.NetworkInterface;
16 import java.net.SocketException;
17 import java.net.UnknownHostException;
18 import java.util.ArrayList;
19 import java.util.EnumSet;
20 import java.util.Enumeration;
21 import java.util.HashSet;
22 import java.util.List;
23 import java.util.Properties;
24 import java.util.Set;
25 import java.util.StringTokenizer;
26 import java.util.concurrent.ConcurrentMap;
27
28 import javax.transaction.HeuristicMixedException;
29 import javax.transaction.HeuristicRollbackException;
30 import javax.transaction.NotSupportedException;
31 import javax.transaction.RollbackException;
32 import javax.transaction.SystemException;
33 import javax.transaction.Transaction;
34 import javax.transaction.TransactionManager;
35
36 import org.infinispan.Cache;
37 import org.infinispan.configuration.cache.Configuration;
38 import org.infinispan.manager.DefaultCacheManager;
39 import org.infinispan.manager.EmbeddedCacheManager;
40 import org.infinispan.notifications.Listener;
41 import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
42 import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
43 import org.infinispan.remoting.transport.Address;
44 import org.infinispan.remoting.transport.Transport;
45 import org.infinispan.remoting.transport.jgroups.JGroupsAddress;
46 import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
47 import org.jgroups.Channel;
48 import org.jgroups.Event;
49 import org.jgroups.stack.GossipRouter;
50 import org.opendaylight.controller.clustering.services.CacheConfigException;
51 import org.opendaylight.controller.clustering.services.CacheExistException;
52 import org.opendaylight.controller.clustering.services.CacheListenerAddException;
53 import org.opendaylight.controller.clustering.services.IClusterServices;
54 import org.opendaylight.controller.clustering.services.IGetUpdates;
55 import org.opendaylight.controller.clustering.services.IListenRoleChange;
56 import org.opendaylight.controller.clustering.services.ListenRoleChangeAddException;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59
60 public class ClusterManager implements IClusterServices {
61     protected static final Logger logger = LoggerFactory
62             .getLogger(ClusterManager.class);
63     private DefaultCacheManager cm;
64     GossipRouter gossiper;
65     private HashSet roleChangeListeners;
66     private ViewChangedListener cacheManagerListener;
67
68     private static String loopbackAddress = "127.0.0.1";
69
70     /**
71      * Start a JGroups GossipRouter if we are a supernode. The
72      * GosispRouter is nothing more than a simple
73      * rendevouz-pointer. All the nodes that wants to join the cluster
74      * will come to any of the rendevouz point and they introduce the
75      * nodes to all the others. Once the meet and greet phase if over,
76      * the nodes will open a full-mesh with the remaining n-1 nodes,
77      * so even if the GossipRouter goes down nothing is lost.
78      * NOTE: This function has the side effect to set some of the
79      * JGROUPS configurations, this because in this function already
80      * we try to retrieve some of the network capabilities of the
81      * device and so it's better not to do that again
82      *
83      *
84      * @return GossipRouter
85      */
86     private GossipRouter startGossiper() {
87         boolean amIGossipRouter = false;
88         Integer gossipRouterPortDefault = 12001;
89         Integer gossipRouterPort = gossipRouterPortDefault;
90         InetAddress gossipRouterAddress = null;
91         String supernodes_list = System.getProperty("supernodes",
92                 loopbackAddress);
93         StringBuffer sanitized_supernodes_list = new StringBuffer();
94         List<InetAddress> myAddresses = new ArrayList<InetAddress>();
95
96         StringTokenizer supernodes = new StringTokenizer(supernodes_list, ":");
97         if (supernodes.hasMoreTokens()) {
98             // Populate the list of my addresses
99             try {
100                 Enumeration e = NetworkInterface.getNetworkInterfaces();
101                 while (e.hasMoreElements()) {
102                     NetworkInterface n = (NetworkInterface) e.nextElement();
103                     Enumeration ee = n.getInetAddresses();
104                     while (ee.hasMoreElements()) {
105                         InetAddress i = (InetAddress) ee.nextElement();
106                         myAddresses.add(i);
107                     }
108                 }
109             } catch (SocketException se) {
110                 logger.error("Cannot get the list of network interfaces");
111                 return null;
112             }
113         }
114         while (supernodes.hasMoreTokens()) {
115             String curr_supernode = supernodes.nextToken();
116             logger.debug("Examining supernode " + curr_supernode);
117             StringTokenizer host_port = new StringTokenizer(curr_supernode,
118                     "[]");
119             String host;
120             String port;
121             Integer port_num = gossipRouterPortDefault;
122             if (host_port.countTokens() > 2) {
123                 logger.error("Error parsing supernode " + curr_supernode
124                         + " proceed to the next one");
125                 continue;
126             }
127             host = host_port.nextToken();
128             InetAddress hostAddr;
129             try {
130                 hostAddr = InetAddress.getByName(host);
131             } catch (UnknownHostException ue) {
132                 logger.error("Host not known");
133                 continue;
134             }
135             if (host_port.hasMoreTokens()) {
136                 port = host_port.nextToken();
137                 try {
138                     port_num = Integer.valueOf(port);
139                 } catch (NumberFormatException ne) {
140                     logger
141                             .error("Supplied supernode gossiepr port is not recognized, using standard gossipport");
142                     port_num = gossipRouterPortDefault;
143                 }
144                 if ((port_num > 65535) || (port_num < 0)) {
145                     logger
146                             .error("Supplied supernode gossip port is outside a valid TCP port range");
147                     port_num = gossipRouterPortDefault;
148                 }
149             }
150             if (!amIGossipRouter) {
151                 if (host != null) {
152                     for (InetAddress myAddr : myAddresses) {
153                         if (myAddr.equals(hostAddr)) {
154                             amIGossipRouter = true;
155                             gossipRouterAddress = hostAddr;
156                             gossipRouterPort = port_num;
157                             break;
158                         }
159                     }
160                 }
161             }
162             if (!sanitized_supernodes_list.toString().equals("")) {
163                 sanitized_supernodes_list.append(",");
164             }
165             sanitized_supernodes_list.append(hostAddr.getHostAddress() + "["
166                     + port_num + "]");
167         }
168
169         if (amIGossipRouter) {
170             // Set the Jgroups binding interface to the one we got
171             // from the supernodes attribute
172             if (gossipRouterAddress != null) {
173                 System.setProperty("jgroups.tcp.address", gossipRouterAddress
174                         .getHostAddress());
175             }
176         } else {
177             // Set the Jgroup binding interface to the one we are well
178             // known outside or else to the first with non-local
179             // scope.
180             try {
181                 String myBind = InetAddress.getLocalHost().getHostAddress();
182                 if (myBind == null
183                         || InetAddress.getLocalHost().isLoopbackAddress()) {
184                     for (InetAddress myAddr : myAddresses) {
185                         if (myAddr.isLoopbackAddress()
186                                 || myAddr.isLinkLocalAddress()) {
187                             logger.debug("Skipping local address "
188                                     + myAddr.getHostAddress());
189                             continue;
190                         } else {
191                             // First non-local address
192                             myBind = myAddr.getHostAddress();
193                             logger.debug("First non-local address " + myBind);
194                             break;
195                         }
196                     }
197                 }
198                 String jgroupAddress = System
199                         .getProperty("jgroups.tcp.address");
200                 if (jgroupAddress == null) {
201                     if (myBind != null) {
202                         logger.debug("Set bind address to be " + myBind);
203                         System.setProperty("jgroups.tcp.address", myBind);
204                     } else {
205                         logger
206                                 .debug("Set bind address to be LOCALHOST=127.0.0.1");
207                         System.setProperty("jgroups.tcp.address", "127.0.0.1");
208                     }
209                 } else {
210                     logger.debug("jgroup.tcp.address already set to be "
211                             + jgroupAddress);
212                 }
213             } catch (UnknownHostException uhe) {
214                 logger
215                         .error("Met UnknownHostException while trying to get binding address for jgroups");
216             }
217         }
218
219         // The supernodes list constitute also the tcpgossip initial
220         // host list
221         System.setProperty("jgroups.tcpgossip.initial_hosts",
222                 sanitized_supernodes_list.toString());
223         logger.debug("jgroups.tcp.address set to "
224                 + System.getProperty("jgroups.tcp.address"));
225         logger.debug("jgroups.tcpgossip.initial_hosts set to "
226                 + System.getProperty("jgroups.tcpgossip.initial_hosts"));
227         GossipRouter res = null;
228         if (amIGossipRouter) {
229             logger.info("I'm a GossipRouter will listen on port "
230                     + gossipRouterPort);
231             res = new GossipRouter(gossipRouterPort);
232         }
233         return res;
234     }
235
236     public void start() {
237         this.gossiper = startGossiper();
238         if (this.gossiper != null) {
239             logger.debug("Trying to start Gossiper");
240             try {
241                 this.gossiper.start();
242                 logger.info("Started GossipRouter");
243             } catch (Exception e) {
244                 logger.error("GossipRouter didn't start exception " + e
245                         + " met");
246                 StringWriter sw = new StringWriter();
247                 logger.error("Stack Trace that raised the exception");
248                 e.printStackTrace(new PrintWriter(sw));
249                 logger.error(sw.toString());
250             }
251         }
252         logger.info("Starting the ClusterManager");
253         try {
254             //FIXME keeps throwing FileNotFoundException
255             this.cm = new DefaultCacheManager("/config/infinispan-config.xml");
256             logger.debug("Allocated ClusterManager");
257             if (this.cm != null) {
258                 this.cm.start();
259                 this.cm.startCache();
260                 logger.debug("Started the ClusterManager");
261             }
262         } catch (Exception ioe) {
263             StringWriter sw = new StringWriter();
264             logger.error("Cannot configure infinispan .. bailing out ");
265             logger.error("Stack Trace that raised th exception");
266             ioe.printStackTrace(new PrintWriter(sw));
267             logger.error(sw.toString());
268             this.cm = null;
269             this.stop();
270         }
271         logger.debug("Cache Manager has value " + this.cm);
272     }
273
274     public void stop() {
275         logger.info("Stopping the ClusterManager");
276         if (this.cm != null) {
277             logger.info("Found a valid ClusterManager, now let it be stopped");
278             this.cm.stop();
279             this.cm = null;
280         }
281         if (this.gossiper != null) {
282             this.gossiper.stop();
283             this.gossiper = null;
284         }
285     }
286
287     @Override
288     public ConcurrentMap<?, ?> createCache(String containerName,
289             String cacheName, Set<cacheMode> cMode) throws CacheExistException,
290             CacheConfigException {
291         EmbeddedCacheManager manager = this.cm;
292         Cache c;
293         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
294         if (manager == null) {
295             return null;
296         }
297
298         if (manager.cacheExists(realCacheName)) {
299             throw new CacheExistException();
300         }
301
302         // Sanity check to avoid contrasting parameters
303         if (cMode.containsAll(EnumSet.of(
304                 IClusterServices.cacheMode.NON_TRANSACTIONAL,
305                 IClusterServices.cacheMode.TRANSACTIONAL))) {
306             throw new CacheConfigException();
307         }
308
309         if (cMode.contains(IClusterServices.cacheMode.NON_TRANSACTIONAL)) {
310             c = manager.getCache(realCacheName);
311             return c;
312         } else if (cMode.contains(IClusterServices.cacheMode.TRANSACTIONAL)) {
313             Configuration rc = manager
314                     .getCacheConfiguration("transactional-type");
315             manager.defineConfiguration(realCacheName, rc);
316             c = manager.getCache(realCacheName);
317             return c;
318         }
319         return null;
320     }
321
322     @Override
323     public ConcurrentMap<?, ?> getCache(String containerName, String cacheName) {
324         EmbeddedCacheManager manager = this.cm;
325         Cache c;
326         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
327         if (manager == null) {
328             return null;
329         }
330
331         if (manager.cacheExists(realCacheName)) {
332             c = manager.getCache(realCacheName);
333             return c;
334         }
335         return null;
336     }
337
338     @Override
339     public void destroyCache(String containerName, String cacheName) {
340         EmbeddedCacheManager manager = this.cm;
341         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
342         if (manager == null) {
343             return;
344         }
345         if (manager.cacheExists(realCacheName)) {
346             manager.removeCache(realCacheName);
347         }
348     }
349
350     @Override
351     public boolean existCache(String containerName, String cacheName) {
352         EmbeddedCacheManager manager = this.cm;
353         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
354         if (manager == null) {
355             return false;
356         }
357         return manager.cacheExists(realCacheName);
358     }
359
360     @Override
361     public Set<String> getCacheList(String containerName) {
362         Set<String> perContainerCaches = new HashSet();
363         EmbeddedCacheManager manager = this.cm;
364         if (manager == null) {
365             return null;
366         }
367         for (String cacheName : manager.getCacheNames()) {
368             if (cacheName.startsWith("{" + containerName + "}_")) {
369                 String[] res = cacheName.split("[{}]");
370                 if (res.length >= 4 && res[1].equals(containerName)
371                         && res[2].equals("_")) {
372                     perContainerCaches.add(res[3]);
373                 }
374             }
375         }
376
377         return (perContainerCaches);
378     }
379
380     @Override
381     public Properties getCacheProperties(String containerName, String cacheName) {
382         EmbeddedCacheManager manager = this.cm;
383         if (manager == null) {
384             return null;
385         }
386         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
387         if (!manager.cacheExists(realCacheName)) {
388             return null;
389         }
390         Configuration conf = manager.getCache(realCacheName).getAdvancedCache()
391                 .getCacheConfiguration();
392         Properties p = new Properties();
393         p.setProperty(IClusterServices.cacheProps.TRANSACTION_PROP.toString(),
394                 conf.transaction().toString());
395         p.setProperty(IClusterServices.cacheProps.CLUSTERING_PROP.toString(),
396                 conf.clustering().toString());
397         p.setProperty(IClusterServices.cacheProps.LOCKING_PROP.toString(), conf
398                 .locking().toString());
399         return p;
400     }
401
402     @Override
403     public void addListener(String containerName, String cacheName,
404             IGetUpdates<?, ?> u) throws CacheListenerAddException {
405         EmbeddedCacheManager manager = this.cm;
406         Cache c;
407         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
408         if (manager == null) {
409             return;
410         }
411
412         if (!manager.cacheExists(realCacheName)) {
413             throw new CacheListenerAddException();
414         }
415         c = manager.getCache(realCacheName);
416         CacheListenerContainer cl = new CacheListenerContainer(u,
417                 containerName, cacheName);
418         if (cl == null) {
419             throw new CacheListenerAddException();
420         }
421         c.addListener(cl);
422     }
423
424     @Override
425     public Set<IGetUpdates<?, ?>> getListeners(String containerName,
426             String cacheName) {
427         EmbeddedCacheManager manager = this.cm;
428         Cache c;
429         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
430         if (manager == null) {
431             return null;
432         }
433
434         if (!manager.cacheExists(realCacheName)) {
435             return null;
436         }
437         c = manager.getCache(realCacheName);
438
439         Set<IGetUpdates<?, ?>> res = new HashSet();
440         Set<Object> listeners = c.getListeners();
441         for (Object listener : listeners) {
442             if (listener instanceof CacheListenerContainer) {
443                 CacheListenerContainer cl = (CacheListenerContainer) listener;
444                 res.add(cl.whichListener());
445             }
446         }
447
448         return res;
449     }
450
451     @Override
452     public void removeListener(String containerName, String cacheName,
453             IGetUpdates<?, ?> u) {
454         EmbeddedCacheManager manager = this.cm;
455         Cache c;
456         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
457         if (manager == null) {
458             return;
459         }
460
461         if (!manager.cacheExists(realCacheName)) {
462             return;
463         }
464         c = manager.getCache(realCacheName);
465
466         Set<Object> listeners = c.getListeners();
467         for (Object listener : listeners) {
468             if (listener instanceof CacheListenerContainer) {
469                 CacheListenerContainer cl = (CacheListenerContainer) listener;
470                 if (cl.whichListener() == u) {
471                     c.removeListener(listener);
472                     return;
473                 }
474             }
475         }
476     }
477
478     @Override
479     public void tbegin() throws NotSupportedException, SystemException {
480         EmbeddedCacheManager manager = this.cm;
481         if (manager == null) {
482             throw new IllegalStateException();
483         }
484         TransactionManager tm = manager.getCache("transactional-type")
485                 .getAdvancedCache().getTransactionManager();
486         if (tm == null) {
487             throw new IllegalStateException();
488         }
489         tm.begin();
490     }
491
492     @Override
493     public void tcommit() throws RollbackException, HeuristicMixedException,
494             HeuristicRollbackException, java.lang.SecurityException,
495             java.lang.IllegalStateException, SystemException {
496         EmbeddedCacheManager manager = this.cm;
497         if (manager == null) {
498             throw new IllegalStateException();
499         }
500         TransactionManager tm = manager.getCache("transactional-type")
501                 .getAdvancedCache().getTransactionManager();
502         if (tm == null) {
503             throw new IllegalStateException();
504         }
505         tm.commit();
506     }
507
508     @Override
509     public void trollback() throws java.lang.IllegalStateException,
510             java.lang.SecurityException, SystemException {
511         EmbeddedCacheManager manager = this.cm;
512         if (manager == null) {
513             throw new IllegalStateException();
514         }
515         TransactionManager tm = manager.getCache("transactional-type")
516                 .getAdvancedCache().getTransactionManager();
517         if (tm == null) {
518             throw new IllegalStateException();
519         }
520         tm.rollback();
521     }
522
523     @Override
524     public Transaction tgetTransaction() throws SystemException {
525         EmbeddedCacheManager manager = this.cm;
526         if (manager == null) {
527             throw new IllegalStateException();
528         }
529         TransactionManager tm = manager.getCache("transactional-type")
530                 .getAdvancedCache().getTransactionManager();
531         if (tm == null) {
532             return null;
533         }
534         return tm.getTransaction();
535     }
536
537     @Override
538     public boolean amIStandby() {
539         EmbeddedCacheManager manager = this.cm;
540         if (manager == null) {
541             // In case we cannot fetch the information, lets assume we
542             // are standby, so to have less responsability.
543             return true;
544         }
545         return (!manager.isCoordinator());
546     }
547
548     private InetAddress addressToInetAddress(Address a) {
549         EmbeddedCacheManager manager = this.cm;
550         if ((manager == null) || (a == null)) {
551             // In case we cannot fetch the information, lets assume we
552             // are standby, so to have less responsability.
553             return null;
554         }
555         Transport t = manager.getTransport();
556         if (t instanceof JGroupsTransport) {
557             JGroupsTransport jt = (JGroupsTransport) t;
558             Channel c = jt.getChannel();
559             if (a instanceof JGroupsAddress) {
560                 JGroupsAddress ja = (JGroupsAddress) a;
561                 org.jgroups.Address phys = (org.jgroups.Address) c
562                         .down(new Event(Event.GET_PHYSICAL_ADDRESS, ja
563                                 .getJGroupsAddress()));
564                 if (phys instanceof org.jgroups.stack.IpAddress) {
565                     InetAddress bindAddress = ((org.jgroups.stack.IpAddress) phys)
566                             .getIpAddress();
567                     return bindAddress;
568                 }
569             }
570         }
571         return null;
572     }
573
574     public List<InetAddress> getClusteredControllers() {
575         EmbeddedCacheManager manager = this.cm;
576         if (manager == null) {
577             return null;
578         }
579         List<Address> controllers = manager.getMembers();
580         if ((controllers == null) || controllers.size() == 0)
581             return null;
582
583         List<InetAddress> clusteredControllers = new ArrayList<InetAddress>();
584         for (Address a : controllers) {
585             InetAddress inetAddress = addressToInetAddress(a);
586             if (inetAddress != null
587                     && !inetAddress.getHostAddress().equals(loopbackAddress))
588                 clusteredControllers.add(inetAddress);
589         }
590         return clusteredControllers;
591     }
592
593     public InetAddress getMyAddress() {
594         EmbeddedCacheManager manager = this.cm;
595         if (manager == null) {
596             return null;
597         }
598         return addressToInetAddress(manager.getAddress());
599     }
600
601     @Override
602     public InetAddress getActiveAddress() {
603         EmbeddedCacheManager manager = this.cm;
604         if (manager == null) {
605             // In case we cannot fetch the information, lets assume we
606             // are standby, so to have less responsability.
607             return null;
608         }
609
610         return addressToInetAddress(manager.getCoordinator());
611     }
612
613     @Override
614     public void listenRoleChange(IListenRoleChange i)
615             throws ListenRoleChangeAddException {
616         EmbeddedCacheManager manager = this.cm;
617         if (manager == null) {
618             // In case we cannot fetch the information, lets assume we
619             // are standby, so to have less responsability.
620             throw new ListenRoleChangeAddException();
621         }
622
623         if (this.roleChangeListeners == null) {
624             this.roleChangeListeners = new HashSet();
625             this.cacheManagerListener = new ViewChangedListener(
626                     this.roleChangeListeners);
627             manager.addListener(this.cacheManagerListener);
628         }
629
630         if (this.roleChangeListeners != null) {
631             this.roleChangeListeners.add(i);
632         }
633     }
634
635     @Override
636     public void unlistenRoleChange(IListenRoleChange i) {
637         EmbeddedCacheManager manager = this.cm;
638         if (manager == null) {
639             // In case we cannot fetch the information, lets assume we
640             // are standby, so to have less responsability.
641             return;
642         }
643
644         if (this.roleChangeListeners != null) {
645             this.roleChangeListeners.remove(i);
646         }
647
648         if ((this.roleChangeListeners != null && this.roleChangeListeners
649                 .isEmpty())
650                 && (this.cacheManagerListener != null)) {
651             manager.removeListener(this.cacheManagerListener);
652             this.cacheManagerListener = null;
653             this.roleChangeListeners = null;
654         }
655     }
656
657     @Listener
658     public class ViewChangedListener {
659         Set<IListenRoleChange> roleListeners;
660
661         public ViewChangedListener(Set<IListenRoleChange> s) {
662             this.roleListeners = s;
663         }
664
665         @ViewChanged
666         public void viewChanged(ViewChangedEvent e) {
667             for (IListenRoleChange i : this.roleListeners) {
668                 i.newActiveAvailable();
669             }
670         }
671     }
672 }