ZooKeeper loss of events problem... fixed
In my latest project at LinkedIn, I have been using ZooKeeper to track the state of all deployed services on every machine in production. The state is then used to drive the deployment as well as monitor the system. I ran into a really tricky bug which took me several months to address (the hardest part was finding what the actual problem was). The (simplified) code was the following:
ZooKeeper zk = ... MapWhat this code essentially does is the following:state = [:] synchronized(lock) { state = trackChildren(state) } // handle add/delete of children private Map trackChildren(Map oldState) { def newState = new HashMap() def children = zk.getChildren('/state', childrenWatcher as Watcher) children.each { child -> if(oldState[child] == null) // ADD trackChild(newState, "/state/${child}") } else { newState[child] = oldState[child] // no change } } // DELETE: newState does not contain the children that have disappeared return newState } // handle child update private void trackChild(Map newState, String path) { if(path == null) return def child = new File(path).name try { newState[child] = zk.getData(path, childWatcher as Watcher, null) } catch(NoNodeException e) { /* ok node is gone */ newState.remove(child) } } def childrenWatcher = { WatchedEvent event -> if(event.type == EventType.NodeChildrenChanged) { synchronized(lock) { state = trackChildren(state) } } } def childWatcher = { WatchedEvent event -> def child = event.path ? new File(event.path).name : null synchronized(lock) { if(event.type == EventType.NodeDataChanged) { trackChild(state, event.path) } if(event.type == NodeDeleted) { state.remove(child) } } }
- Sets a children watcher on /state to be notified of children addition/deletion
- For each child, sets a node watcher to be notified of child modification and deletion. Note that you need this watcher because ZooKeeper will not call the parent watcher when a child gets modified!
- As a result, the map called state always contain a 'copy' of the data of all children.
- Watches are one time triggers; if you get a watch event and you want to get notified of future changes, you must set another watch.
- Because watches are one time triggers and there is latency between getting the event and sending a new request to get a watch you cannot reliably see every change that happens to a node in ZooKeeper. Be prepared to handle the case where the znode changes multiple times between getting the event and setting the watch again. (You may not care, but at least realize it may happen.)
- A watch object, or function/context pair, will only be triggered once for a given notification. For example, if the same watch object is registered for an exists and a getData call for the same file and that file is then deleted, the watch object would only be invoked once with the deletion notification for the file.
byte[] child1State = ...
zk.delete('/state/child1', -1)
zk.setData('/state/child1', child1State, -1)
In this scenario, I would receive 2 events:
- /state childrenWatcher would be fired with a NodeChildrenChanged event. And the bug gets triggered here. According to key concept #2, essentially events can be collapsed: in this case I am getting only 1 NodeChildrenChanged event and when I list my children (getChildren), I actually see no difference: child1 has been removed and added, but I don't see it. I am effectively loosing the 'add' child event with the code written this way.
- /state/child1 childWatcher would we fired with a NodeDeleted event resulting in state.remove('child1')... at this point I have indeed lost the state of child1.
byte[] child1State = ...
zk.delete('/state/child1', -1)
Thread.sleep(5000)
zk.setData('/state/child1', child1State, -1)
I actually get 2 events and I have enough time to process them so it works. But that is clearly not the right fix.
In order to fix the problem, I did 2 things:
- The children watcher is only dealing with 'ADDs' and does not handle 'DELETE' anymore
- The child watcher handles 'DELETE' but also tries to set a watcher no matter what
ZooKeeper zk = ... MapThe code (in the childWatcher) most likely looks very unintuitive, but what is really happening in this case, is that the child on which the watcher is set will always receive the NodeDeleted event and according to key concept #1, if you want to be able to receive further events you need to set another watcher. It seems unintuitive to set a watcher on a node that just got deleted (and most of the time it will trigger a NoNodeException which is handled properly), but between the time the node was deleted and the time you check, it may very well have been recreated in which case you guarantee that you will have a watcher set on it and you won't loose it. If the node was deleted and not recreated right away, then the childrenWatcher will be the one setting it up again in the future if it gets recreated (pay close attention to the synchronization in the code as it is pretty critical for the system to work properly). Here is an example:state = [:] synchronized(lock) { state = trackChildren(state) } // handle ADD only... let the children watcher handle DELETE private Map trackChildren(Map oldState) { def newState = new HashMap(oldState) def children = zk.getChildren('/state', childrenWatcher as Watcher) children.each { child -> if(newState[child] == null) trackChild(newState, "/state/${child}") } return newState } // handle child update private void trackChild(Map newState, String path) { if(path == null) return def child = new File(path).name try { newState[child] = zk.getData(path, childWatcher as Watcher, null) } catch(NoNodeException e) { /* ok node is gone */ newState.remove(child) } } def childrenWatcher = { WatchedEvent event -> if(event.type == EventType.NodeChildrenChanged) { synchronized(lock) { state = trackChildren(state) } } } def childWatcher = { WatchedEvent event -> def child = event.path ? new File(event.path).name : null synchronized(lock) { if(event.type == EventType.NodeDataChanged) { trackChild(state, event.path) } if(event.type == NodeDeleted) { state.remove(child) // WARNING!!! UNINTUITIVE BUT CRITICAL!!! trackChild(state, event.path) } } }
- /state/child1 simply deleted
-
- childrenWatcher receives NodeChildrenChanged => does nothing
- childWatcher receives NodeDeleted => state.remove('child1')
or
-
- childWatcher receives NodeDeleted => state.remove('child1')
- childrenWatcher receives NodeChildrenChanged => does nothing
- /state/child1 deleted / recreated (events get collapsed)
-
- childrenWatcher receives NodeChildrenChanged => does nothing
- childWatcher receives NodeDeleted => state.remove('child1') then state['child1']=new state + watcher
or
-
- childWatcher receives NodeDeleted => state.remove('child1') then state['child1']=new state + watcher
- childrenWatcher receives NodeChildrenChanged => does nothing
- /state/child1 deleted / recreated (events are not collapsed)
-
- childrenWatcher receives NodeChildrenChanged => does nothing
- childWatcher receives NodeDeleted => state.remove('child1')
- childrenWatcher receives NodeChildrenChanged => state['child1']=new state + watcher
or
-
- childWatcher receives NodeDeleted => state.remove('child1')
- childrenWatcher receives NodeChildrenChanged => state['child1']=new state + watcher
or
-
- childrenWatcher receives NodeChildrenChanged => does nothing
- childWatcher receives NodeDeleted => state.remove('child1') then state['child1']=new state + watcher
- childrenWatcher receives NodeChildrenChanged => does nothing
Connecting to a local vm using jmx knowing the process id.
On the project I am currently working on at LinkedIn, I needed to programatically access the jmx interface of a java VM. The caller is another program written in java/groovy and knows the process id of the VM it wants to talk to. Note that both VMs are running on the same host. jconsole does exactly that so it should be pretty straightforward. In the end, it is not very complicated, but to get to that point it took me several hours of scratching my head and debugging to make it right.
The page Monitoring and Management Using JMX Technology, describes a technique which allows you to attach to another virtual machine using the com.sun.tools.attach.VirtualMachine (which is not part of the standard jdk 1.6 api, but is an internal SUN class so if you use a SUN VM, it is available. It is part of the OpenJDK project).
Extracting the JMXServiceURL (groovy):
private static final String CONNECTOR_ADDRESS =
"com.sun.management.jmxremote.localConnectorAddress";
private JMXServiceURL extractJMXServiceURL(pid)
{
// attach to the target application
com.sun.tools.attach.VirtualMachine vm =
com.sun.tools.attach.VirtualMachine.attach(pid.toString());
try
{
// get the connector address
String connectorAddress =
vm.getAgentProperties().getProperty(CONNECTOR_ADDRESS);
// no connector address, so we start the JMX agent
if (connectorAddress == null) {
String agent = vm.getSystemProperties().getProperty("java.home") +
File.separator + "lib" + File.separator +
"management-agent.jar";
vm.loadAgent(agent);
// agent is started, get the connector address
connectorAddress =
vm.getAgentProperties().getProperty(CONNECTOR_ADDRESS);
}
// establish connection to connector server
return new JMXServiceURL(connectorAddress);
}
finally
{
vm.detach()
}
}
Once you obtain the JMXServiceURL, then you need a reference to the JMXConnector:
def connector = JMXConnectorFactory.connect(url); def connection = connector.getMBeanServerConnection(); // use the connection...When I tried this approach it was working fine on my development environment but when I deployed it on a test machine, I got the following exception:
Caused by: com.sun.tools.attach.AttachNotSupportedException: Unable to open door: target process not responding or HotSpot VM not loaded at sun.tools.attach.SolarisVirtualMachine.(SolarisVirtualMachine.java:68) at sun.tools.attach.SolarisAttachProvider.attachVirtualMachine(SolarisAttachProvider.java:42) at com.sun.tools.attach.VirtualMachine.attach(VirtualMachine.java:195) at com.sun.tools.attach.VirtualMachine$attach.call(Unknown Source)
Quite an unusual error message if you are not familiar with Solaris... The issue that I uncovered here is that the ability to attach to another VM is jdk1.6 only and on the test machine I was trying to connect from a 1.6 VM to a 1.5 VM and that does not work (note that in my case I have no choice and need to run with both VMs).
To fix this issue, I needed a way that would work with both VMs. There is another internal API which can be used to extract the JMXServiceURL: using the class sun.management.ConnectorAddressLink:
private JMXServiceURL extractJMXServiceURL(pid)
{
String serviceURL = null
try
{
serviceURL = sun.management.ConnectorAddressLink.importFrom(pid as int)
}
catch(IOException e)
{
log.warn("Cannot find process ${pid}")
}
if(serviceURL == null)
return null
else
return new JMXServiceURL(serviceURL)
}
Something to keep in mind, is that there is a difference between 1.5 and 1.6:
This solution was working great in my development environment but was failing again in my testing environment. I spend close to 3 hours in trial and error: the issue now was an "IOException: process not found" error when calling the importFrom method. At some point, I realized that when I was using jconsole, it was not listing my java processes and that the jps command was not returning anything either.
I then run my test program using the truss command (Solaris) which logs all the system calls. I then realized that the method is looking for a file called /tmp/hsperfdata_<username>/<pid> (where username is the user executing the unix process). This folder was empty and this is why it was not returning anything. I later on realized that the permissions on the folder were wrong and were preventing the VM to write its pid in it. The frustrating part is that the failure was totally silent and never reported in any log file and there was no way to turn on any debugging level to see the error. If it wasn't for the truss command I am not certain how I could have figured this out since it is totally undocumented and fails silently. Changing the permissions on the folder immediately fixed the problem!
This is a very good demonstration of why the pattern:try
{
// do something which may throw an exception but if it does I will ignore
// and continue
}
catch(Exception e)
{
// ok ignored
}
is a very bad pattern and should be replaced with something like:
try
{
// do something which may throw an exception but if it does I will ignore
// and continue
}
catch(Exception e)
{
// ok ignored
if(log.isDebugEnabled())
log.debug("ignored exception", e)
}
Configuring apache -> tomcat load balancer
Now that kiwidoc has been released, I can share my experience on how I configured the system in 'production'. kiwidoc is hosted at rackspace on 2 machines. A small one for the load balancer (apache web server) and a bigger one for the main application (tomcat). Configuring it was quite a challenge and I just want to share how I did it. Note that the instructions are for Ubuntu 9.0.4 with a stock installation of apache (2.2.11-2ubuntu2.3) and tomcat (6.0.18-0ubuntu6) using the standard apt* commands.
What did I want to achieve ?
My main application is a web application and is deployed in tomcat under [/java]. The load balancer (apache) should be able to direct traffic to multiple instances of tomcat when the need arises. I also wanted http://www.kiwidoc.com/ (in other word [/]) to be redirected to [/java/] which is my main entry point. The catch is that there are some pages that need to be served by apache (like some error pages) and this was not easy to configure.Configuring tomcat (Part I)
On the tomcat side, I setup a new connector for ajp (file /etc/tomcat6/server.xml):
<!-- Define an AJP 1.3 Connector on port 8009 -->
<Connector port="8009" protocol="AJP/1.3" redirectPort="8010"
proxyname="www.kiwidoc.com" proxyPort="80" URIEncoding="UTF-8"/>
I chose ajp because it is supposed to be much faster than standard http. So far the configuration is not too difficult. proxyname and proxyport are used so that the methods ServletRequest.getRemoteHost() and ServletRequest.getRemotePort() return the correct value.
Configuring apache
On the apache side, I added the 4 modules (directory /etc/apache2/mods-enabled):proxy_ajp.load -> ../mods-available/proxy_ajp.load proxy.load -> ../mods-available/proxy.load proxy.conf -> ../mods-available/proxy.conf proxy_balancer.load -> ../mods-available/proxy_balancer.loadThen under /etc/apache2/sites-enabled, I have the following file (which I called 100-lb):
<VirtualHost *:80> ########################## # DocumentRoot DocumentRoot /var/www <Directory /var/www/> Options FollowSymLinks MultiViews AllowOverride None Order allow,deny allow from all </Directory> ########################## # Error handling ErrorLog /var/log/apache2/error.log LogLevel warn CustomLog /var/log/apache2/access.log combined ErrorDocument 503 /errors/error_503.html ErrorDocument 404 /errors/error_404.html ########################## # Proxy ProxyRequests Off <Proxy *> Order deny,allow Allow from all </Proxy> <Proxy balancer://kiwidoc> BalancerMember ajp://123.123.123.123:8009 BalancerMember ajp://123.123.123.124:8009 </Proxy> ProxyPass /errors ! ProxyPass /images ! ProxyPass / balancer://kiwidoc/ </VirtualHost>Let's cover each section:
- The first 2 lines define an exclusion rule: all requests to [/errors] and [/images] will be served by apache and not forwarded (this is required due to the 3rd line).
- The last line send all traffic to [/] to the balancer.
- The static content should be served by apache (which does it very efficiently).
- If all tomcat instances are unreachable, then apache will issue a 503 error code, which gets mapped to [/errors/error_503.html] and without the exclusion rule, it would try to go to tomcat (which we know is unreachable...). This use case happens for example when I need to shutdown the main application for maintenance: you see a nice maintenance page.
Configuring tomcat (Part II)
We are almost there. The issue now is that [/] goes to tomcat which needs to handle it properly. So here is what I did:Under /var/lib/tomcat6/webapps/ROOT (which is what tomcat uses for [/]), I have a mini webapp:
WEB-INF/web.xml
---------------
<web-app xsi:schemaLocation='http://java.sun.com/xml/ns/j2ee
http://java.sun.com/xml/ns/j2ee/web-app_2_4.xsd'
version='2.4'
xmlns='http://java.sun.com/xml/ns/j2ee'
xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'>
<error-page>
<error-code>404</error-code>
<location>/errors/error_404.html</location>
</error-page>
</web-app>
index.jsp
---------
<% response.sendRedirect("http://www.kiwidoc.com/java/"); %>
errors/error_404.html
Let's cover each section:
Conclusion
First of all, it works, achieves what I described early on and I believe it covers all cases like maintenance mode and 'not found' error pages. Nonetheless I wish I had found a cleaner/simpler way to do that or in other words, to not have to create a ROOT webapp on the tomcat side. The main issue stems from the fact that I was totally unable to express in apache the simple rule: redirect [/] ONLY to [/java/] because using the [/] rule is treated as [/*]. I would be surprised if it was not possible, it is just very hard to find the documentation that explains how to do it.Improving performances of a Lucene Search
Lucene is a popular java based text search engine. You add documents to the index using an IndexWriter and then you can search the index using an IndexSearcher. In order to search, the most flexible api is to use the callback api:
indexSearcher.search(query, new new HitCollector() {
public void collect(int docID, float score) {
// do whatever you want...
}});
For every document which matches the query, Lucene calls the hit collector with the document id of the match as well as the score. This document id is internal to Lucene and cannot be relied upon as it can (and will) change (for example when optimizing the index). The usual practice is to add a field to the document that you index which contains an id which has meaning in your application:
Document doc = new Document();
doc.add(new Field("ID", String.valueOf(myID),
Field.Store.YES, Field.Index.NOT_ANALYZED));
This is useful as well when you want to update/delete the document from the index:
indexWriter.deleteDocuments(new Term("ID", String.valueOf(myID)));
In the callback loop, you can then retrieve your id from the Lucene document id by doing indexSearcher.doc(docID) which returns a Document from which you can simply extract your previously stored id.
This works fine, but is relatively expensive. Indeed, Lucene is very good at caching the index in memory but the problem is since the document is not part of the cache, then it requires a disk access. Depending on how many documents are matching the query it can have some serious implication on the performance.
When I mentioned that you cannot rely on the Lucene id because it changes, it is true. Nonetheless, while the index searcher is opened and until you close it, this id will not change. We can use this property to add some caching which will improve the performances quite a bit. The idea is that when you open the searcher, you simply 'read' and cache all your ids in memory (and you discard the cache when you close it):
String[] myCache = FieldCache.DEFAULT.getStrings(searcher, "ID"); // each entry in the cache is simply the doc id from Lucene!
To make things nicer, I hid all of this under some apis and created my own wrapper:
// a hit collector with userData
public interface LuceneHitCollector<T> {
void collect(int doc, float score, T userData);
}
// wraps a lucene searcher to use the new hit collector
public class LuceneIndexSearcherImpl<T> implements LuceneIndexSearcher<T>
{
private final IndexSearcher _indexSearcher;
private final T[] _userData;
public LuceneIndexSearcherImpl(IndexSearcher indexSearcher, T[] userData) {
_indexSearcher = indexSearcher;
_userData = userData;
}
public LuceneHitCollector<T> search(Query query, final LuceneHitCollector<T> collector)
throws IOException {
_indexSearcher.search(query, new HitCollector() {
public void collect(int doc, float score) {
collector.collect(doc, score, _userData[doc]);
}});
return collector;
}
}
The performance improvements are quite dramatic: a query that used to take around 350ms is now taking about 14ms! Pretty nice. Of course this will work well if you open your searcher and keep it open for several queries which is the case of my application. This technique requires some extra memory but if you can afford it, it is totally worth it.
Note that the api I created is using generics: I wanted to be able to use the payload feature if I need later on to store more than the id. For example if I wanted to store an id and a timestamp, I could create a small serializable object and store it (serialized) as a byte array in the payload of the field. When I open the searcher I could read all the payloads and deserialize them in an array of the correct object type (instead of an array of Strings like in the example). To create an array of the proper size you can simply use the searcher.maxDoc() api. The code to use the payload feature is a little cumbersome/complicated and would require too much code to demonstrate in this blog.
This post is presenting one solution to improve the performances of a Lucene search but there are many other techniques. It definitely works if you have a little extra memory to spare. I wanted to thank the LinkedIn search team for the inspiration!
-
Search
-
Feed
-
Links
-
Recommendations
-
Recent Entries
- ZooKeeper loss of events problem... fixed
- Indexing android 'froyo' javadoc in kiwidoc
- Connecting to a local vm using jmx knowing the process id.
- Configuring apache -> tomcat load balancer
- pongasoft presents... kiwidoc
- CSS for the UI design
- The real cost of high-speed internet in the US
- Grails/Groovy for the frontend
- OSGi at LinkedIn (EclipseCon 2009)
- Improving performances of a Lucene Search
- git for source control management
- Version Management and OSGi
- Grails - Invoking a tag lib from another tag lib
- Starting from scratch... domain name and web hosting
- Grails - Proper shutdown in dev mode
- pongasoft.... a new adventure
- Welcome to the software cookbook!
-
Calendar
