Klout is proud to announce the initial release of Brickhouse, an open source project extending Hadoop and Hive with a collection of useful user-defined-functions. Its aim is to make the Hive Big Data developer more productive, and to enable scalable and robust dataflows. Klout makes use of many of the projects in the open source Big Data community such as Hadoop, HBase and Hive. We want to contribute the tools we’ve developed and found useful to the open-source community.
Like what you see? Join the Klout engineering team to help us leverage big data!
Why Brickhouse ?
Here at Klout, we deal with lots of big data. We also need to move and produce prototypes quickly. We love the Apache Hive project because it allows us to define a complex chain of map-reduce jobs in a single query.
In order to implement some fairly common use-cases, however, we found functionality missing in the standard Hive distribution. We also found the same functionality being implemented over and over, simply because people didn’t know that a function had already been written. We needed a central repository to encourage re-use within the company. We suspect that some of this functionality has been implemented, and re-implemented by various groups around the world, but for various reasons have never been provided in a single location, under a simple license. While there may be some overlap with existing functionality from various sources, we package it in a more convenient manner.
Another pain we felt was the transition from prototype to production. With Hive, it is fairly easy to come up with a query to generate a one-off dataset, no matter how wacky the requirements. Converting that to a scalable, robust, production pipeline is a different story. We needed not just a library of UDFs, usable by multiple developers, but a library of MapReduce design patterns to produce robust scalable dataflows. In future blog posts, we’ll discuss various techniques to use Brickhouse to turn your Hive queries into scalable data-flows.
What’s in the Brickhouse ?
The Brickhouse covers a fairly wide area of functionality, but can be grouped along various themes:
Brickhouse implements a “collect” UDAF for aggregating into lists and maps, as well as other data structure manipulations which were missing from Hive, for combining and aggregating maps and lists, like “set_diff” and “intersect_array”. We also include UDFs which help you write more efficient queries, like conditional_emit and multiday_count. For example, the “collect_max” UDAF allows you to aggregate a set of top N values over large sets and potentially avoid expensive re-sorts.
Hive provides some simple support for JSON, with get_json_object, json_tuple, and the various JSON Serdes out there. These are great, but we often need to access JSON with more complicated formats, and would like to access them from a UDF, rather than having to create a new table all the time with a JSON Serde.
The Brickhouse UDFs json_map and json_split allow you to convert JSON directly into maps and arrays. The UDF to_json outputs JSON text for arbitrary Hive structures and
from_json allow you to parse arbitrary JSON into the Hive type of your specification. This makes it easier to interact with external systems which use JSON as a data representation.
For scalable reach estimations, we provide a simple sketch set implementation. The sketch set UDFs sketch_set , combine_sketch, and estimated_reach allow you to estimate the number of unique values in a set, without having to store each member in a array, or perform an expensive query like “SELECT COUNT( DISTINCT )”. This allows you to estimate reach for very large sets, without have to resort all of your data.
We’ve wrapped the Bloom counter implementation shipped with the Hadoop/HBase distribution into UDFs. This allows you to store determine set membership for large sets. Brickhouse has methods bloom and distributed_bloom to generate blooms for large sets and store in the distributed cache, where they can be used to avoid an expensive join.
To productionize our code, we’ve added UDFs to throw an error and “blow the whistle” if a sanity test fails. We like to validate certain datasets, at various points in the chain with the assert UDF, to make sure processing stops when we discover a serious problem. We also provide write_to_graphite and write_to_tsdb so we can more easily track certain metrics with each production flow. This helps us track the growth of our data, and helps us plan accordingly.
We’re also experimenting with different ways to integrate Hive and HBase. With the hbase_batch_put UDAF and a well-clustered query, you can do bulk imports into HBase directly from Hive, without killing your region-servers.
How can I use the Brickhouse ?
For now, we are providing brickhouse in source code form at https://github.com/klout/brickhouse , and you need to perform your own local build. At some point we will publish to public maven repos, so that it can be declared as a third-party dependency from maven or ivy.
- We assume that you have already been using and loving Hive. Because we’d like to utilize some advanced features, we require at least Hive 0.9.0.
- Install Git, Maven, and a Java JDK, if you do not have them already
- Clone (or fork) the repo from github https://github.com/klout/brickhouse
- run “mvn package” to produce the jar file.
- Add the jar to your HIVE_AUX_JAR_FILES path, or add it to the cache with an “add jar” command.
- Declarations for all the UDFs are under src/main/resources/brickhouse.hql. Source this file from Hive.
A simple example
Should you use the Brickhouse? What can it help you with? Check out the wiki at https://github.com/klout/brickhouse/wiki/ for more specific documentation. We plan on adding more documentation, and more blog posts, but here is an example which can demonstrate the power of Brickhouse.
One common use-case we have here at Klout is calculating the top-k values. For example, who are the top 50 users with the highest Klout scores, for some grouping?
Imagine you had a table user_score which had a column user_id and score. Say you had another table, containing user’s primary location, called user_geo, which had columns user_id and state. How could we find the users with the top 50 Klout Scores, for each state? This is certainly possible in multiple ways. To join and sort the tables, you could run a query like:
select state, user_id, score , group_count( state ) from (select state, user_id, score from (select geo.region as state, geo.user_id, sc.score from user_geo geo join user_score sc on( geo.user_id = sc.user_id ) ) join_geo distribute by state sort by state, score desc ) score_distrib where group_count( state ) <= 50;
This joins the two table, and then distributes them and sorts them to get the top values. The group_count UDF returns the number of times in a row that the column “state” has had the same value, and the clause “where group_count (state) <= 50” caps the users at 50.
But this is one line per user/state. We’d prefer to have one line per state, and a map<string,double> of user scores. This is where the “collect” UDF becomes useful. We can do something like the following, to aggregate into a map.
select state, collect( user_id, score) as score_map from ( select state, user_id, score, group_count( state ) from (select state, user_id, score from ( select geo.region as state, geo.user_id, sc.score from user_geo geo join user_score sc on( geo.user_id = sc.user_id ) ) join_geo distribute by state sort by state, score desc ) score_distrib where group_count( state ) <= 50
) score_ranked group by state;
This will work, but notice the extra aggregation. That adds an extra map-reduce job to our flow. Also, this query could potentially be very unbalanced. Some states might be much larger. Even though the values are capped, a large number of values may need to be sorted, and the sorting could be very unbalanced.
However, Brickhouse provides a collect_max UDF, which is similar to the “collect” UDF, but stores only the top 50 values seen. The query could be re-written as
select state, collect_max( user_id, score ) as score_map from ( select geo.state, geo.user_id, sc.score from user_geo geo join user_score sc on( geo.user_id = sc.user_id ) ) join_geo group by state;
This reduces the number of steps, because it performs the collect in the same step that the ranking was done. The flow doesn’t need to sort by score; it only needs to distribute by state.
Also, if Hive map-side aggregation is enabled, performance is enhanced even further. The top 50 values can be accumulated on the map side before being flushed out to the sorter, allowing more processing to be done sooner. Also the job should always be fairly well-balanced, since the data-structure for saving the top 50 values for million records takes as much space as the one for only 100 records. This is just one example.
The Klout engineering team plans to continue extending the Brickhouse, and improve its overall quality. We’ll add functionality which is not available elsewhere, and solves a problem for many groups dealing with big data. We may decide to exclude certain things, because its scope is for a limited domain.
Let us know what you think. Let us know if you find it useful, or what needs some more work. More complete documentation is available at the github wiki at https://github.com/klout/brickhouse/wiki/UDF-Overview. If you find a bug, please file at https://github.com/klout/brickhouse/issues.