Wednesday 25 May 2011

Extending Query support via Coprocessor endpoints

Aim:
Using Coprocessor Endpoints to cater to user defined queries.

Background:
Coprocessors have been added in HBase trunk for a while now (Jira 2000). I did some experiments with them and would like to get some feedback on it from the community. A nice explanation about coprocessors is available at Mingjie's Coprocessor post.

Coprocessors can be used for
a) observing server side operations (like the administrative kinds such as Region splits, major-minor compactions , etc) , and
b) client side operations that are eventually triggered on to the Region servers (like CRUD operations).
Another use case is letting the end user to deploy his own code (some user defined functionality) and directly invoking it from the client interface (HTable). The later functionality is called as Coprocessor Endpoints.

From endpoints perspective, a major advantage of using coprocessors is that calls to the 'interested' regions are fired in parallel. They are also useful  for use cases when the computed result is sparse: therefore it is sensible to process it 'region local' and send the computed result rather than sending the table rows to the client. A canonical example of the later use case is to do a row count on a subset of a table. In this case, each 'interested' region will send its corresponding row count to the client It is to be noted that even in case of Endpoints, client has to do a merge of all the individual region results.
This merging is achieved by the callback method that is defined while invoking these Endpoints. Though the usage is quite clear by test case and the above  mentioned Mingjie's blog, I am briefing out major points in order to use a Coprocessor Endpoints:
 
[I used NGrams as one of the datasets, so class names are reflecting this]
  1. Define a Coprocessor protocol (NGramProtocol, that extends CoprocessorProtocol). Its an interface that defines the functionalities to be exposed as endpoints.
  2. Implement the above interface (NGramImplementation). This concrete  class will be instantiated as part of a Region (a singleton object from a HRegion perspective).
  3. Create two objects at the client side: a callable and a callback. The former defines the method and arguments of the Coprocessor that is to be invoked, and the later is called for every result that is obtained from the 'interested' regions.
  4. Make sure the coprocessor implementation class is loaded in the Region. I used the config option: using hbase.coprocessor.region.classes property in the hbase-site.xml. Its value is the qualified class name of coprocessor implementation class (org.apache.hadoop.hbase.coprocessor.NGramImplementation).
One can refer to Coprocessor test classes like (TestCoprocessorEndpoint, TestAggregateProtocol) in main trunk to get some concrete examples.

Datasets:
I used 2 datasets for testing the coprocessor endpoints and the streaming result functionality.
  •  Bixi data: It is a public dataset available by Public Bike Systems Inc, for their operations in Montreal. It is a bike renting service which has 404 (yeah 404!) sensor equipped stations where one can take/leave a bike. These sensors emit data about number of empty docks, number of bike, location etc and it is collected per minute.
Figure 1. Sample Bixi data for 1 station
This information is made available as XML files at https://profil.bixi.ca/data/bikeStations.xml. Fabrice scraped this site to collect data for 70 days (24th Sept - 1st Dec 2010). There were 96,842 data points. It was about 12GB. Figure 1 shows a sample xml for 1 station.   
  • Ngram: Google published ngrams for their ebooks collection. For each word, it shows its frequency, number of distinct pages and number of books it was printed, in a specific year. Figure 2 shows the file structure for two sample words. As per figure, America14 appears 5 times, on 5 pages in 5 books in the year 1938.
    Figure 2. NGram data sample
  • ycsb: Its not a dataset, but a well known bench marking framework that can be used to insert configurable amount of data. I used it specifically for row count operations.
Sample Queries:
The main goal of these experiments was to test fuse cases for Coprocessor Endpoints. So, I tried to form queries that involve some computation that can be done at region server side, and are also data parallel so that there is no inter row dependencies while doing them.
Query 1: (Bixi Data) For a given time, geographical coordinates and a radius, get a list of stations with available bikes, sorted by their distance from the given locations.
Query 2: (Bixi Data) For a given time, list average bike usage for  last 1hr, 6hr, 12hr and 18hr, for a given list of stations or All the 404 stations.
Query 3: (NGrams): Get a list of top 3 frequencies with their year for all words that share a given prefix, for eg. 'America'.
Query 4: (NGrams): Perform query 3 for a bag of words.

Queries (3 and 4) are not supported by current google ngram viewer, which takes specific words (a Get like operation).

The main purpose of query 4 was to test the "streaming results functionality" that helps in navigating resultset generated by coprocessors in an incremental way. It can be correlated to a cursor like feature in rdbms but here the results are  generated with every next() invocation on the cursor infrastructure.
The existing coprocessor endpoint supports doing computation and then returning the result. These calls are stateless, so the entire region (or the part that needs to be processed) should be processed completely before returning the result. There might be some use cases when a client is interested in consuming the result in an incremental manner, like in a batch size of 1000. It uses current Coprocessor RPC mechanism with some modifications, like treating null results as a valid response.

Schema:
Bixi: Based on queries 1 and 2, I made the timestamp as the row key. I assumed there will not be a problem of hot regions as it will be only 1 insert per min (assuming it to be a real time application). StationIds become column qualifier and the xml content was extracted and stored as the cell value. So, each row has 404 columns in one column family.

NGrams: Each word is made the row key, and the years become column qualifiers. The cell value is a comma separated list of frequencies (as shown in Figure 3).

Figure 3. Sample schema for the above NGram data points.



Experiments done

Cluster: I used a 5 node cluster on EC2 (3 c1.xlarge instances for RS+DN; 1 m1.xlarge for NN+HMaster; 1 m1.xlarge for ZK).
I created a hbase AMI using main trunk code (don't remember the exact date but it was around 14th May), and hadoop append(0.20.3). This image is  publicly available.

a) Query 1: This is a 'Get' like query which has some computation like calculating distance between two geological points. One row size is about 90KB and vanilla scan is a better candidate. Specifically, average response time was 1.57 sec for Scan and 1.6 sec for Coprocessor based approach.

b) Query 2: This involves a range scan because there is one row per minute. So, the scan ranges from 60, 360, 720 and 1080 rows for 1, 6, 12 and 18h period respectively. I tested it for 3 stations and all 404 stations. Figure 4 and 5 shows a comparison between vanilla scan and coprocessor based endpoint approach.

Figure 4. Response time for Query 2 for 3 stations


Figure 5. Response time for Query 2 for all 404 stations
The response time varies with number of rows to be processed. In case of 3 stations, vanilla scan outperforms cp based for 1 and 6 hr. Coprocessor based approach becomes slightly better in case of 12 and 18 hrs (Figure 4) This difference manifests when it was run for 404 stations (Figure 5).

c) Query 3,4:
I did the Query 3 testing with prefix as 'America'. There were only 424 words, so there was not much of difference. The computation (for computing the top 3 frequencies) was also fairly light given the node specs. The overall result size was just 630 KB.
I tested with prefix 'A', all words that start with 'A'. This query doesn't have any linguistic meaning, but I fired it just to see what happens in case the result size is larger. There were 0.2m such rows, distributed over two regions. It used the cursor framework, returning 1000 rows for every rpc and took 21 sec. Scanner with cache size 1000 took 30 sec.
The difference between the approaches become quite clear with the bag of words query (Figure 6).
It is to be noted that current coprocessor mechanism finds interested regions  range before firing up the requests. In this case, it was done based on the input words. So, the first word starts with 'b' and last word starts with 'p', the requests will be send to all the regions that lies in this range. In this case, there were 12 intermediary regions. The coprocessor executes it in 44 sec while scanner takes about 158 sec with cache size of 1000. The main reason for this large time  with scan is all the rows were  passed to the client as I don't think there is any filter that I could have used (I might be missing something!). I tried to use List of PrefixFilter, but that's not solving because it stops the processing as soon as a row is not present in either of the filter. One can say why not start a new scan for all these different words, but that's not the objective of the experiment. There might be other use case which tends to have large number of such words and this approach will not scale there. Moreover, with Coprocessor we can do the scan in parallel so why not use it.


Figure 6. Response time for query 3 and 4, in seconds.
 There is one twist in this experiment for query 3 and 4 that is worth mentioning. I was testing for Query 3 and 4 with these prefixes, the corresponding rows should be in RS cache (or in OS cache) at least for query 3 where i was just reading 424 rows. So, there was no disk read for Query 3. Though, it should have been same for both the approaches as I was taking an average for last 3 readings.
I plan to do a more randomise (and more realistic) testing for this type of query. Any suggestions are most welcome here.

The current streaming results framework uses the existing Coprocessor rpc where for each  invocations to the endpoint, all the individual results from regions are subjected to the callback object before returning it to client. In the case of Query 4, there were 12 regions and some of them  (read 9) didn't have any of these words. So, they become the bottleneck as they were to be read completely in order to figure out that there is no row belonging to the bag of words. I could have improved it by using HRegionInfo to look for start/end row of that region (It occurred to me after I was done). It used the streaming result as the region containing the last two words was queried twice (ie. two RPCs).

d) Row count: For this I used ycsb to insert data in the table. Each row has 10 columns and its over all size was 1Kb. I used scanner cache  =1000 in all my experiments.
It has 3 variations, one using scanner API, using Mapreduce job (there is one rowcounter mr job in HBase), and using Coprocessor.


Fig 1. Response time of row count operation for 3 approaches, in seconds.







With 1m and 10m the coprocessor approach perform better than other two. The reason is it did it in parallel, one thread for each region.
With 100m, it failed as client gave time out error (default 1 minute). The plausible reason seems to be the default behaviour assumes client will get reply in 1 minute, but there were 156 regions divided among 3 region servers. To return a request to a specific region, it needs to be scanned completely. So, effectively it was like client expected the full table scan to be done in 1 minute max, else it will throw a time out error. Even though there were 100 handlers per region server, some took more than 1 minute to do the region scanning and client timed out for them. Since the request is 'fail fast', if any request takes more than 1 min, it will fail the entire process.
Mapreduce does it in 631 sec, it scales really well as data is increased; good for batch jobs as it provides more throughput.

It is to be noted that the row count using coprocessor was not using the streaming result. I plan to use it with a batch size of, like 1000. This should avoid the timeout exception as for each rpc, all the regions will need to reed only 1000 rows before returning the count.

The streaming result framework is up there for a revision for a while. It was half baked when I posted it. and it still needs to be worked more as to get the return types of the ClientCursor object correct, and need to test it for boundary case like nsre, etc.

I would like to hear what others think of these experiments as this is an important component of my Master's thesis at UofA, Canada. Your feedback will be very helpful.

9 comments:

  1. Thanks a lot to my colleague Koosha (http://www.blogger.com/profile/08833052466255327530) for his feedback/suggestions on the content style and structure.

    ReplyDelete
  2. This work has been accepted in Research Track in Servicewave 2011 (http://servicewave.eu/2011/).

    ReplyDelete
  3. hey himanshu! i am having trouble figuring out where all these corprocessor classes/interfaces are in the hadoop/hbase jars. i am using the cloudera distribution but can't find them anywhere. Is there any seperate jar that has these classes?

    ReplyDelete
  4. @ Sid: The coprocessor framework will be in the 0.92 version, which is in the final phases of its release.
    To play around with them, you can download the third release candidate from http://people.apache.org/~stack/.

    ReplyDelete
  5. @himanshu ya that's what i read on the cloudera site. they plan to put it in CDH4 which is due Q2. thanks your blog was very useful!

    ReplyDelete
  6. Mingjie's Coprocessor post has been relocated to https://blogs.apache.org/hbase/entry/coprocessor_introduction

    ReplyDelete
    Replies
    1. Yeah, changed. Thanks for pointing this out.

      Delete
  7. Is the code publicly available? If so, can you post the URL?

    ReplyDelete
  8. Indeed, can you share details/code on the streaming strategy you used? Seems that Writable forces a push-mode, which means that the coprocessor code can be streamed and push into stream in its own pace, but the consumer cannot be pull and consume in its own pace from its control thread, but instead I need a push-pull converter, i.e. a bucket in memory or disk, because the rpc executor just calls my writable.readFields and than closes the stream. that means either i need to serialize again to off heap or disk, or that in read i just copy bytes to mmaped disk say with Mapdb, creating assimetry in read/write implementation of my writable value, and later reading from the disk-offloaded value, i apply deserialization and create a pullable stream like Iterable or java 8 Stream..

    ReplyDelete