Klout Engineering

Leap Second Induced Downtime Post-Mortem

July 1st, 2015 by Ian Kallen

So yesterday was the 6/30/2015 leap second day and boy was it (not) fun.

We’ve had previous leap second events and they also were (not) a barrel of laughs. The last one triggered a myriad of Java Virtual Machine process malfunctions, especially amongst services that depended on Zookeeper sessions for distributed system coordination. Remedying required that we find the errant processes and restart them. Klout uses HBase as distributed big data store; we have multiple clusters for different workloads and HBase’s region (essentially, data shard) management has proven to be very sensitive to the Zookeeper anomalies. The leap second incident prior to that caused a spin-up of JVM resource consumption; it actually triggered a power surge and tripped a circuit. The data center incident took many hours to work through as we got power restored and clusters running again.

Since those prior leap second events, we’ve upgraded our JVM versions in a number of our clusters and some of our HBase clusters were recently refreshed with updated hardware and software versions. The new version of HBase we updated isn’t the final one we’re landing on this year, due to compatibility with extant components, but the interim state we landed on was a big improvement. The data infrastructure, data science, Lithium community analytics and Klout’s core services teams have been working together over the past several months as we consolidate Klout and Lithium’s workloads onto common infrastructure, updated versions and normalize the division of operational responsibilities. We’re very excited about the version of HBase we’re ultimately moving to, it improves numerous performance and stability operating characteristics.

Anticipating the 6/30/2015 leap second, we had our engineers on stand-by. Recognizing that the prior leap second events were unlike each other, it’s difficult to predict what this one would look like. The one area that we expected to be most susceptible to anomalies was, of course, HBase. We expected, worse case, we’d need to restart one or more of our HBase clusters and be able to do so in 10-15 minutes and suffer an outage no more than 15-20 minutes. The hour struck and when Klout went down, we immediately homed in on our HBase clusters. Klout’s service is highly dependent upon HBase – if it’s not operating correctly, Klout’s service is not either. The HBase clusters that were updated earlier this year were fine, the ones we’re planning to refresh were not (we had a timeline to get this done in mid-June but too many slips in the project predecessors have pushed it late July).

This is where things got weird. HBase relies on a distributed filesystem (HDFS) and during cluster restarts, while the cluster master was processing its transaction logs (the SplitLogManager), it’d reported that the filesystem “went away” while its recovery was in mid-flight. From outside appearances, HDFS seemed fine, none of the runtimes were misbehaving and data integrity seemed intact. After a few iterations with that, it seemed clear that there was something more nuanced going on. OK, back up and restart HDFS, not a quick process but one we needed to do to eliminate it as a culprit. Then we restarted the HBase recovery. Although the HDFS restart seemed to help a little bit, the HBase recovery cadence seemed steadier, it was still troublingly slow. In fact it seemed get progressively slower and grind down to a halt after a while but it was no longer hitting the HDFS “went away” condition.

The region servers seemed fine. We hooked up jconsole to the JMX port we have exposed on the HBase master. At first, it reported nothing alarming with the heap usage and garbage collections looked like pretty short, sub-second episodes. Until we hit one long into the recovery cycle, a two minute stop-the-world garbage collection pause! The long pauses are killers, the master’s Zookeeper sessions timeout, its ephemeral node disappears, region servers think it’s dead and they don’t recover when the pause has passed; the cluster essentially dies. We raised the Zookeeper session timeouts to three minutes but the next iteration seemed to hit the same cluster death condition. So maybe the “tenured” generation of objects cleaned up by the concurrent mark/sweep garbage collector had too much work to do, we dialed down the CMSInitiatingOccupancyFraction JVM parameter and restarted. It ran its course for a long time and then hit the same two minute pause and cluster death. Upon closer observation, the pauses were in the ParNew (Parallel New) garbage collector, the “young generation” of JVM objects. My intimacy with JVM tuning isn’t especially deep so I may have this wrong but my understanding is that if there’s too much fragmented garbage in the young generation, lots of little objects, the garbage collection pause is a long stop-the-world event. We were four hours into a downtime and still were stymied by this failed HBase recovery. We had NewSize and MaxNewSize JVM parameters set to 192m, we dialed them down to 128m and restarted: voila! The recovery cadence was faster by a factor of five and it never hit any long GC pauses – within 15 minutes we had the cluster recovered and restored service.

We turned our attention to other internally facing services that were older JVMs and needed to be restarted. Once things stabilized, we came up for air and reckoned with how things ran so far off the tracks. We’ve had HBase cluster restarts before, they are service interrupting but generally not long-lived efforts. We’re still not clear why the SplitLogManager’s work was any different from any other cluster restart, requiring JVM turning parameters different from our otherwise stable-for-years parameters.

Some key take-aways:

  • we need to complete our upgrades to get us on updated JVM versions everywhere and newer versions of our big data infrastructure
  • when preparing for an anticipated incident, designate a communications coordinator – when things got weird, communication to the rest of the staff that things had gotten really weird and decisions could be made about what further communication was required
  • our teams need more smart people who are experienced with java programming, distributed systems, JVM tuning and linux systems knowledge – is that you? join us!

We expect there will be another leap second within the next 18 months or so and we should be on a much better footing with updated JVMs and infrastructure. Until then, we hope this peak into the fun (or not) of running complex big data infrastructure at scale was entertaining!

Posted in Big Data, Open Source, Platform Engineering | No Comments »

Scoozie: Creating Big Data Workflows at Klout

August 1st, 2013 by Matthew Johnson

Workflow specification is a major pain point for us as at Klout. Workflows, a way of chaining together Big Data jobs (mapreduces, hive scripts, distributed copies, file system tasks, etc.) are an essential tool in organizing large batch jobs into scalable and maintainable pipelines. One standard Hadoop Workflow system is Apache Oozie, an XML-based technology. Scoozie is our solution and allows us to improve upon the developer experience of using Oozie.

Oozie requires writing copious amounts of XML and often leads to copying and pasting a mashup of jobs and configurations from file to file. These specifications are not type safe, prone to typos, difficult to read and understand, and often difficult to test. We wanted to make workflows easy to write, understand, and maintain while encouraging best-practices.

On the path to a solution to this problem, the team evaluated several options, including replacing Oozie and implementing a new workflow engine. We decided, however, that Oozie actually works great—the major pain is the specification. So we thought, how can we make Oozie easier to use? The answer was to develop Scoozie.

The Philosophy Behind Scoozie

Screen shot 2013-08-01 at 9.36.26 AM

The natural choice to solve these problems was to implement a Scala DSL on top of Oozie’s XML schema (Scala + Oozie = Scoozie). Scala’s strong typing and support for common-sense readability (examples to come), as well as the fact that Klout is already a huge Scala shop, made Scala the perfect fit. Developers with no experience with Scala at Klout have found Scoozie and Scala easy to pick up and intuitive to use.

In the traditional Oozie workflow, nodes are forward-chaining; the developer specifies where a node goes to after it is finished. This can be hard to track, and requires the developer to hold in mind every node in the chain when developing the workflow. Instead of following this design philosophy, we decided to focus the Scoozie DSL around dependencies.  In this way the developer only needs to look at one node in a workflow at a time, and think only about what that node depends on. Scoozie will figure out the rest.  This greatly reduces cognitive overhead and amplifies productivity.

What it Looked Like Before

Oozie XML is nasty, and in order to give a taste of exactly how nasty it is, here’s an example of what a very simple pipeline of two parallel jobs looks like without Scoozie.

Screen shot 2013-08-01 at 10.09.09 AM

Clearly many of these parameters will be common throughout most if not every Map-Reduce Job that needs to be specified, yet these parameters and configurations must be manually copied from file to file or even from job to job, such as in the example above. A simple case of missing a parameter or a typo in a minor change could lead to hours of frustrated debugging later on.

What it Looks Like Now 

Focus on Dependencies

Screen shot 2013-08-01 at 9.37.54 AM

This is an example of the general structure of any Scoozie workflow specification. As can be seen, the developer only needs to worry about dependencies and hence only needs to focus on one node at a time.

Don’t Worry About Forks and Joins

Screen shot 2013-08-01 at 9.37.44 AM

Again, the Scoozie developer only needs to worry about the dependencies of any given node.

Scoozie is smart enough to figure out fork/join structures for you, and even verify workflows against Oozie’s strict fork/join requirements. Already, it can be seen how much clearer this is than the XML example provided above.

Modularity and Scalability

Screen shot 2013-08-01 at 9.37.37 AM

Scoozie lets the developer nest workflows, allowing for better readability, abstraction, and reuse of code (notice “Subwf” on line 4 is the entire workflow specified in the previous example) . This is another step to make best practices easy to follow in the specification of big-data workflows.

Using Scala’s Super-Flexible Syntax

Screen shot 2013-08-01 at 9.37.27 AM

Scoozie strives to create a syntax that makes the DSL intuitive to read and pick up. The scoozie engine makes this common-sense syntax possible by doing the legwork for you in the background.

In addition, custom job factory methods are easy to create and use, making it natural to reuse jobs to prevent the duplication of configuration code in workflow specifications. Scoozie helps you minimize boilerplate!

Open Sourcing Scoozie

Scoozie has been a success at Klout, so we hope others in the Scala and Big Data communities can take advantage of this project as well. Please check out the Scoozie repo for more code examples and an in-depth tutorial of how to get started using Scoozie. And if you’re interested in working with Scala and big data at Klout, visit our careers page or take a look at Klout’s other Big Data open source project called Brickhouse.

Additionally, we would like to provide a shout-out to a great open-source project, scalaxb, an sbt plugin that takes .xsd files and creates matching Scala case classes. Scoozie populates these case classes, which are then automatically converted to XML by scalaxb. This plugin saved a lot of headache in the actual process of conversion to XML.

Posted in Big Data, Open Source | 1 Comment »

Checking in on Foursquare

May 14th, 2013 by Prantik Bhattacharyya

At Klout, our mission is to empower every person by unlocking their influence. Klout measures influence across several social networks and shows users how they impact the people connected to them. As social networks evolve, it’s important for us to understand new ways users are communicating with their audiences and update our analytical methods to integrate new features.

We are happy to announce an update for our users who have Foursquare connected. Starting today, new Foursquare stats will be displayed on users’ dashboards and will appear under “Your Recent Moments”.

We’d like to highlight our Director of Product Engineering, Kevin Liu, and show how his dashboard looks with the latest update. Kevin is an avid Foursquare user who loves to explore San Francisco’s restaurant scene.

We measure influence on Foursquare by our users’ ability to drive social actions. Interactions on a user’s check-ins and tips as well as the number of friends and mayorships form the basis of how our algorithm measures Foursquare actions. Details about how the actions are incorporated into the Klout score can be found here.

We welcome you to connect your Foursquare account to Klout. A special thanks to the team at Klout for making this happen, particularly David Ross, Naveen Gattu, Adithya Rao and Sean McGary. If you’re interested in joining the team behind the Klout algorithm, we are hiring!

Tags: ,
Posted in Science | No Comments »

Beautiful Moments: Klout + Instagram

March 28th, 2013 by Prantik Bhattacharyya

Klout loves Instagram! So we are extremely excited to announce we are adding Instagram into the Klout score. By connecting your Instagram account with Klout, you can broaden your influence and showcase your beautiful moments from Instagram to the people you influence and the world.

How Instagram Will Impact Your Klout Score

We designed the Klout score so that the addition of a network can only increase your score, never decrease it. Users with low Instagram activity levels will not see a score drop after connecting their Instagram accounts. If you are an active Instagram user, you can expect your score to go up.

In the following chart, we show the change in the Klout score distribution for users who have already added Instagram as a network to their Klout account. More than 10 percent of Klout users who have Instagram connected will see their scores increase into the 50 to 70 score range.

So, who will see a score change? In the next chart, we show the percentage of users who will see a change. Over 77 percent of users who have connected their Instagram accounts will see a score increase between 1 and 5 points. And 16 percent of Instagram-connected users will see a Score increase of 5 or more points.

Actions on Instagram

We measure influence on Instagram by analyzing your ability to drive social actions. The influence of the people engaging with your content, the number of followers, and the actions taken on the photos you post (e.g., “Likes” and “Comments”) form the foundation of our analytical methods. Details about how signals are incorporated into the Klout score can be found here.

Check your score summary on your dashboard for detailed statistics and to learn how your interactions on Instagram feed into your Klout score.

The number of followers and social interactions on the photos uploaded by users increases significantly for each score range of 10 points. A user with a score between 40 and 50 has on average approximately 150 followers and 300 actions on their photos over the past 90 days. In contrast, a user with a score between 80 and 90 has approximately 50,000 followers and 200,000 social actions, both orders of magnitude greater.

How Instagram Will Enhance Your Klout Moments

With millions of beautiful photographs being posted every day on Instagram, it’s easy to miss a picture that resonated with your network. We took a closer look at a few users who stood out to us.

Shannon Ferguson is a digital media executive, Instagramming pictures from Paris, France. Because Instagram is one of her main networks of influence, her Klout score has risen by 18.97 points, bringing her to a 63.35!

Dan Marker-Moore, a motion designer from Los Angeles, uses Instagram to share pictures of beautiful landscapes, skylines, and architecture. He maintains a blog dedicated to unique photographs of payphones. Although Dan isn’t active on Twitter with only a handful of followers, Dan’s Instagram engagement has bumped his score from 40.49 to 74.64.

Lonely Planet’s Instagram account brings you the best photographs from their employees’ international vacations. Their showcasing of the personal side of their brand has bought them a dedicated audience on Instagram. These photogenic moments have moved their score from 93.83 to 95.31.

Chevrolet celebrates the ingenuity of vehicle engineering and design through their Instagram account. Whether they are giving users a sneak peek at VIP events or satisfying their midday ‘need for speed’ craving—their Instagram account is making waves in the Instagram brand community. Our score integration has brought them from a score of 93.44 to 95.25.

A special thanks to the team at Klout for making this happen, particularly David Ross, Naveen Gattu, Adithya Rao and Christine Nguyen. If you’re interested in joining the team behind the Klout Score, come join us!

Tags: ,
Posted in Science | 12 Comments »

Introducing a Little Box of Big iOS Tools

February 12th, 2013 by Josh Whiting
Klout for iPhone is built on LittleBox iOS Toolkit

Klout for iPhone is built on LittleBox iOS Toolkit

In developing Klout’s iOS application, we ended up building a variety of internal application classes that contained behaviors broadly applicable to many iOS applications. This code was so useful that we decided to move it out of the proprietary application and put it into an external library for release to the open source community. As such, the library doesn’t exclusively tackle any one major problem, nor does it provide a comphrehensive framework for app development in general. It’s simply a useful grab-bag of single-purpose tools. We call it LittleBox. In many cases it solves problems you didn’t even know you had, so take a look!

What’s inside?

  • Singleton management classes. Singletons are an essential part of managing state in a complex application without turning your app delegate into a kitchen sink. However, singletons present challenges of their own: how do you subclass them? How do you reset state? How do they communicate with non-singletons? LittleBox offers solutions to these problems.
  • Specific utility singletons. These provide centralized management of analytics instrumentation and debugging events, the status bar network activity indicators, and global application spinners.
  • A wide variety of one-off utility methods. These cover important processing tasks for strings, dates, UIView geometry, UIImages, and OAuth 1.0a headers in NSMutableURLRequests.
  • UIView subclasses to help with programmatic management of buttons and activity indicators.
  • Memory management helper classes to avoid common retain-related problems with NSTimer and NSArray.
  • And more.

As Klout’s iOS app continues to evolve, we’ll be adding more and more to this collection. And of course, your feedback and pull requests are more than welcome in improving its quality and coverage.

But hey, if you’re an iOS developer interested in what we’re doing, you’ll be glad to know we’re hiring!

You can explore LittleBox on GitHub.

Posted in Mobile, Open Source, Platform Engineering | No Comments »

Iteratees in Big Data at Klout

January 25th, 2013 by Naveen Gattu

At Klout we calculate social influence of users across several social networks and we must be able to collect and aggregate data from these networks scalably and within a meaningful amount of time. We then deliver these metrics to our users visually.

Our users and clients expect data to be up to date and accurate and it has been a significant technical challenge to reliably meet these goals. In this blog post we describe the usage of Play! Iteratees in our redesigned data collection pipeline. This post is not meant to be a tutorial on the concept of Iteratees, for which there are many great posts already such as James Roper’s post and Josh Suereth’s post. Rather, this post is a detailed look at how Klout uses Iteratees in the context of large scale data collection and why it is an appropriate and effective programming abstraction for this use case. In a later post we will describe our distributed Akka-based messaging infrastructure, which allowed us to scale and distribute our Iteratee based collectors across clusters of machines.

Iteratees in a Nutshell

In a sentence, Iteratees are the functional way to model producers and consumers of streams of data. Single chunks of data are iteratively produced by an Enumerator, optionally mapped/adapted by an Enumeratee and then consumed by an Iteratee. Each stage can be composed and combined together in a pipeline like fashion.

Enumerator (produce data) → Enumeratee (map data) → Iteratee (consume data)

Other compositions are also possible with the Play! Iteratee library, such as Enumerator interleaving (multiple concurrent enumerators), Enumeratee chaining, and Iteratee folding (grouping enumerated data into larger chunks).

Legacy Data Collection

Our legacy collection framework was written in Java and built on the java.util.concurrent library. As a consequence, data collection consisted of isolated nodes fetching user data sequentially with convoluted succeed-or-retry semantics in blocking threads. As our datasets grew with an ever increasing user base, the inefficiencies of the system started to become apparent.

Data was fetched and written to disk in much the same way an Apache based web-server services requests; with a single thread responsible for all possible IO encountered in the code path for a single user collection request. At a high level this IO consists of the following 3 stages (for every user/network/data-type combination):

1. Dispatch the appropriate api calls and parse json responses
2. Write data to disk / HDFS
3. Update progress metrics

These three stages are necessarily sequential, but individually they are highly paralelizable, and more importantly may be executed asynchronously. For instance, in stage 1 we can issue multiple simultaneous API calls to construct a particular user activity, i.e. John Doe posted “I love pop-tarts for breakfast!” to Facebook and received 20 comments and 200 likes. The activity, which consists of the status message and all 20 comments and 200 likes, can be constructed asynchronously and in parallel.

New Collection

Recognizing the gains to be made from a non-blocking, parallelized implementation, we decided to re-architect the collector framework with the awesome Play!+Scala+Akka stack. This stack has many nice feature sets and libraries, of particular interest is the Typesafe implementation of Iteratees. The nice thing about this implementation are the pre-written utilities such as Iteratee.foreach and its rich Enumeratee implementation. We also made very heavy use of the Play! WebServices library, which provides a thin scala wrapper around the Ning asynchronous http client and integrates beautifully with the iteratee library using Play Promises (to be completely integrated with Akka promises in the Play 2.1 release).

Paging Enumerator

Data from a network, accessible via api calls, is conveniently returned as paginatable json. To support this we needed a generic and abstract paging mechanism for each type of data we were fetching, being posts, likes or comments. We exploited the fact that each page of data included a next url for easy pagination, which is very elegantly handled with a fromCallback Enumerator:

This example leaves out error handling, retry and backoff logic but provides a good initial intuition. Given a starting url, the enumerator simply keeps track of one piece of mutable state, the next url, which it continually updates each time the retriever function is invoked. This paging enumerator gives us a way to reactively fetch data, where we do not fetch more data than requested. For instance we could apply this enumerator to a file writing Enumeratee and be assured that we will not overwhelm our disk. Or use a ‘take’ Enumeratee to limit the number of ws calls to a predefined limit. We could then attach a status updating iteratee to this chain and be assured that we will not overwhelm our database. An Enumeratee, for those squinting their eyes, can be thought of as an adapter which transforms data types produced by an Enumerator, to types consumed by an Iteratee.

Enumerator of Enumerators

The paging enumerator is great for keeping track of next page urls, however the json data on each page is typically a list of posts which need individual processing. Each post typically contains an associated set of likes and comments with corresponding fetch urls, which also need to be paged through and joined to the original post json in order to construct a full activity which we can then finally consume. We want to be able to generate and process each activity as a single json document, with all its associated likes and comments meta-data, while still maintaining our requirement of not overwhelming our system, with the additional goal of parallelizing API calls as much as possible. Exploiting the highly composable nature of the Iteratee library, we can process a stream of posts while fetching the associated likes and comments and build each activity in parallel using a combination of Enumeratee.interleave and Iteratee.fold:

We can now apply our buildActivity method to each post in each list of posts:

As a final step, we need to ‘flatten’ our Enumerator of Enumerators to create an Enumerator of Activities. However at the time of this writing, flattening Enumerators was not part of the standard Play! Iteratee library, so we took the liberty of writing one ourselves:

File Writing Enumeratee

Armed with our shiny new reactive, paginated and parallelized Activity Enumerator, we now need to hook it up to our file writing logic. Lets assume the internals of the file writing have been abstracted away into one function:

writeToFile(json: JsValue): Promise[Either[Failure, Success]]

From the type signature of writeToFile we can assume it executes asynchronously, finally returning either Failure or Success objects. From this we can construct an Enumeratee which we could then apply our Activity Enumerator too (as part of the overall Iteratee pipeline):

Again, flatMap is not part of the standard Iteratee library:

The file writing Enumeratee simply maps each Activity to an Either, containing either the Failure from writeToFile if it failed, or the Activity for further processing. Note that although conceptually, file writing seems more like the job of an Iteratee, an Enumeratee is the appropriate construct since we do not want to ‘consume’ input from the Enumerator, we want to map input and pass on for later processing. We now have stage 2 of our 3 stage pipeline.

Status Updating Iteratee

As we process each Activity, we want to iteratively collect and report stats, cursor information, errors and other meta-data. Since this is the final stage, we appropriately model it as an Iteratee which acts as the ‘sink’ in our pipeline. For reasons of clarity and brevity, this is a simplified version of the actual Iteratee we use, but it illustrates the point:

Putting it All Together

The final step is to hook all these guys up so that it can actually do something meaningful:

The beauty is in the simplicity, and more importantly, the composability. We could add other pipeline stages simply by implementing an Enumeratee or Iteratee with the proper types, and we will get all the other benefits for free.

Data collection is the foundation of the Klout experience, enabling us to aggregate, analyze and track influence across our social lives. It’s what allows us to highlight our most influential moments.

Want to dabble with Iteratees in production? Come join us and ping us on Twitter at @navgattu@AdithyaRao, @prantik, and @dyross.

Posted in Big Data, Platform Engineering, Science | No Comments »

Introducing Brickhouse – Major Open Source Release from Klout

January 16th, 2013 by Jerome Banks

Klout is proud to announce the initial release of Brickhouse, an open source project extending Hadoop and Hive with a collection of useful user-defined-functions.  Its aim is to make the Hive Big Data developer more productive, and to enable scalable and robust dataflows. Klout makes use of many of the projects in the open source Big Data community such as Hadoop, HBase and Hive. We want to contribute the tools we’ve developed and found useful to the open-source community.

Like what you see? Join the Klout engineering team to help us leverage big data!

Why Brickhouse ?

Here at Klout, we deal with lots of big data. We also need to move and produce prototypes quickly.   We love the Apache Hive project because it allows us to define a complex chain of map-reduce jobs in a single query.

In order to implement some fairly common use-cases, however, we found functionality missing in the standard Hive distribution.  We also found the same functionality being implemented over and over, simply because people didn’t know that a function had already been written. We needed a central repository to encourage re-use within the company. We suspect that some of this functionality has been implemented, and re-implemented by various groups around the world, but for various reasons have never been provided in a single location, under a simple license.  While there may be some overlap with existing functionality from various sources, we package it in a more convenient manner.

Another pain we felt was the transition from prototype to production.  With Hive, it is fairly easy to come up with a query to generate a one-off dataset, no matter how wacky the requirements. Converting that to a scalable, robust, production pipeline is a different story. We needed not just a library of UDFs, usable by multiple developers, but a library of MapReduce design patterns to produce robust scalable dataflows. In future blog posts, we’ll discuss various techniques to use Brickhouse to turn your Hive queries into scalable data-flows.

What’s in the Brickhouse ?

The Brickhouse covers a fairly wide area of functionality, but can be grouped along various themes:


Brickhouse implements a “collect” UDAF for aggregating into lists and maps, as well as other data structure manipulations which were missing from Hive, for combining and aggregating maps and lists, like “set_diff” and “intersect_array”.  We also include UDFs which help you write more efficient queries, like conditional_emit and multiday_count. For example, the “collect_max” UDAF allows you to aggregate a set of top N values over large sets and potentially avoid expensive re-sorts.


Hive provides some simple support for JSON, with get_json_object, json_tuple, and the various JSON Serdes out there. These are great, but we often need to access JSON with more complicated formats, and would like to access them from a UDF, rather than having to create a new table all the time with a JSON Serde.
The Brickhouse UDFs  json_map and json_split allow you to convert  JSON directly into maps and arrays. The UDF to_json outputs JSON text  for arbitrary Hive structures and
from_json allow you to parse arbitrary JSON into the Hive type of your specification. This makes it easier to interact with external systems which use JSON as a data representation.


For scalable reach estimations, we provide a simple sketch set implementation. The sketch set UDFs sketch_set , combine_sketch, and estimated_reach allow you to estimate the number of unique values in a set, without having to store each member in a array, or perform an expensive  query like “SELECT COUNT( DISTINCT )”. This allows you to estimate reach for very large sets, without have to resort all of your data.


We’ve wrapped the Bloom counter implementation shipped with the Hadoop/HBase distribution into UDFs. This allows you to store determine set membership for large sets.  Brickhouse has methods bloom and distributed_bloom to generate blooms for large sets and store  in the distributed cache, where they can be used to avoid an expensive join.


To productionize our code, we’ve added UDFs to throw an error and “blow the whistle” if a sanity test fails. We like to validate certain datasets, at various points in the chain with the assert UDF, to make sure processing stops when we discover a serious problem.  We also provide write_to_graphite and write_to_tsdb so we can more easily track certain metrics with each production flow. This helps us track the growth of our data, and helps us plan accordingly.


We’re also experimenting with different ways to integrate Hive and HBase. With the hbase_batch_put UDAF and a well-clustered query, you can do bulk imports into HBase directly from Hive, without killing your region-servers.

How can I use the Brickhouse ?

For now, we are providing brickhouse in source code form at https://github.com/klout/brickhouse , and you need to perform your own local build. At some point we will publish to public maven repos, so that it can be declared as a third-party dependency from maven or ivy.

  • We assume that you have already been using and loving Hive.  Because we’d like to utilize some advanced features, we require at least Hive 0.9.0.
  • Install Git, Maven, and a Java JDK, if you do not have them already
  • Clone (or fork) the repo from github https://github.com/klout/brickhouse
  • run “mvn package” to produce the jar file.
  • Add the jar to your HIVE_AUX_JAR_FILES path, or add it to the cache with an “add jar” command.
  • Declarations for all the UDFs are under src/main/resources/brickhouse.hql. Source this file from Hive.

A simple example

Should you use the Brickhouse? What can it help you with? Check out the wiki at  https://github.com/klout/brickhouse/wiki/  for more specific documentation. We plan on adding more documentation, and more blog posts, but here is an example  which can demonstrate the power of Brickhouse.

One common use-case we have here at Klout is calculating the top-k values. For example, who are the top 50 users with the highest Klout scores, for some grouping?

Imagine you had a table user_score which had a column user_id and score. Say you had another table, containing user’s primary location, called user_geo, which had columns user_id and state. How could we find the users with the top 50 Klout Scores, for each state? This is certainly possible in multiple ways. To join and sort the tables, you could run a query like:

select state,
   score ,
   group_count( state )
      geo.region as state,
      user_geo geo
      user_score sc
    on( geo.user_id = sc.user_id )
  ) join_geo
  distribute by state
  sort by state, score desc
) score_distrib
where group_count( state ) <= 50;

This joins the two table, and then distributes them and sorts them to get the top values. The group_count UDF returns the number of times in a row that the column “state” has had the same value, and the clause “where group_count (state) <= 50” caps the users at 50.

But this is one line per user/state. We’d prefer to have one line per state, and a map<string,double> of user scores.  This is where the “collect” UDF becomes useful. We can do something like the following, to aggregate into a map.

   collect( user_id, score) as score_map
from ( 
  select state,
    group_count( state )
    from (
        geo.region as state,
        user_geo geo
        user_score sc
      on( geo.user_id = sc.user_id )
    ) join_geo
    distribute by state
    sort by state, score desc
  ) score_distrib
 where group_count( state ) <= 50
) score_ranked
group by state;
This will work, but notice the extra aggregation. That adds an extra map-reduce job to our flow. Also, this query could potentially be very unbalanced.  Some states might be much larger.  Even though the values are capped, a large number of values may need to be sorted, and the sorting could be very unbalanced.

However, Brickhouse provides a collect_max UDF, which is similar to the “collect” UDF, but stores only the top 50 values seen.  The query could be re-written as

  collect_max( user_id, score ) as score_map
     user_geo geo
     user_score sc
   on( geo.user_id = sc.user_id )
) join_geo
group by state;

This reduces the number of steps, because it performs the collect in the same step that the ranking was done.  The flow doesn’t need to sort by score; it only needs to distribute by state.

Also, if Hive map-side aggregation is enabled, performance is enhanced even further. The top 50 values can be accumulated on the map side before being flushed out to the sorter, allowing more processing to be done sooner. Also the job should always be fairly well-balanced, since the data-structure for saving the top 50 values for million records takes as much space as the one for only 100 records. This is just one example.


The Klout engineering team plans to continue extending the Brickhouse, and improve its overall quality.  We’ll add functionality which is not available elsewhere, and solves a problem for many groups dealing with big data. We may decide to exclude certain things, because its scope is for a limited domain.

Let us know what you think. Let us know if you find it useful, or what needs some more work. More complete documentation is available at the  github wiki at https://github.com/klout/brickhouse/wiki/UDF-Overview. If you find a bug, please file at https://github.com/klout/brickhouse/issues.

Posted in Big Data, Platform Engineering | No Comments »

How Klout Turned Big Data Into Giant Data

October 11th, 2012 by Dave Mariani

Last November, I wrote a post called Big Data, Bigger Brains. In that post, I wrote about how we were able to make business intelligence (BI) work in a Big Data environment at Klout. That was a big step forward for Klout, but our work wasn’t yet done. Recently we launched a whole new website and a new Klout Score that substantially upped the stakes.

Really Big Data

At Klout, we need to perform quick, deep analysis on vast amounts of user data. We need to set up complex alerts and monitors to ensure that our data collection and processing is accurate and timely. With our new Score, we increased the amount of signals we collected by four times. This means that we now collect and normalize more than 12 billion signals a day into a Hive data warehouse of more than 1 trillion rows. In addition, we have hundreds of millions of user profiles that translate into a massive “customer” dimension, rich with attributes. Our existing configuration of connecting Hive to SQL Server Analysis Services (SSAS) by using MySql as a staging area was no longer feasible.

Bye-Bye MySql

So, how did we eliminate MySql from the equation? Simple. We leveraged Microsoft’s Hive ODBC driver and SQL Server’s OpenQuery interface to connect the SSAS directly to Hive. Microsoft’s Kay Unkroth and Denny Lee and myself wrote a whitepaper detailing the specifics here. Now, we process 12 billion rows a day by leveraging the power of Hadoop and HiveQL right from within SSAS. For our largest cube, it takes about an hour to update a day’s worth of data – yes, 12 billions rows worth. By combining a great OLAP engine like SSAS with Hive, we get the best of both worlds: 1 trillion rows of granular data exposed through a interactive query interface compatible with existing business intelligence tools.

What I really want for Christmas…

So, what’s wrong with this story? What I really want is the OLAP engine itself to reside alongside of Hive/Hadoop, rather than live alone in a non-clustered environment. If the multi-dimensional engine resided inside HDFS, we could eliminate the double-write (write to HDFS, write to the cube) and leverage the aggregate memory and disk available across the Hadoop cluster for virtually unlimited scale out. As an added benefit, a single write would eliminate latency and vastly simplify the operational environment. I can dream, can’t I?

Do you want to take on Big Data challenges like this? We’re looking for great engineers who can think out of the box. Check us out.

Posted in Big Data | 4 Comments »

Klout Gets Hacking

October 5th, 2012 by Tyler Singletary

The end of the quarter is the perfect time to take a day (or two or three) to celebrate the awesome work completed the past 90 days. And what better way to celebrate than…more work?! Well, a hackathon to be precise.


The Front End team discussing their Hackathon project
For those unfamiliar, a hackathon is a contest where developers (designers, marketers and product-minded folk are also welcome) work for a designated amount of time—24 hours or a weekend usually—to bring a new product, software, etc., from inception to prototype. It’s a bit of an engineering tradition, picking up steam in startups and particular industries, many of which host internal and external events and award prizes to the winners.


Klout team working on their code and design during the Klout Hackathon
Klout hosted a 24-hour internal hackathon for Klout employees to work on pet projects, innovations, and new ideas using our usual product roadmap. While we can’t promise that you’ll see any of these work their way on to Klout.com, we’re excited to find a place for many of the projects in the future. The turnout was impressive: 17 teams of one to four people worked on ambitious projects. Here are a few of them:

The winning app was an Influence Landscape Visualization by Keith Walker. Keith took a concept we’ve talked about many times, and have seen a few of our Partner Developers implement on our API in the past: plot the influence graph of a user and make it a visual experience. An influencer’s connections are displayed as dots of increasing magnitude relative to their Klout Score. You can drill down on any of these connections to then see that influencer’s connections, and so on and so forth down the rabbit hole.

Our second winning hack looked at surfacing new insights from our recent Klout Moments feature. Casting them through the lens of location and your immediate network, this team (consisting of Mao Ye, Mark Azevedo, Sreevatsan Raman, and Adithya Rao) actually conceived this as two separate projects, then realized they could easily mesh into one. Moments can be viewed by city, allowing you to view the top influential content in San Francisco. Curious what the people you influence are saying? Their recommended moments surface their top content.

The rest of the submissions were no less notable, including heat cloud visualizations, sentiment analysis, scientific discovery of movers and shakers, real-time backend tracking, an influential business radar, a new scoring system for career growth, and a uniquely polished presentation for affiliate marketing for bars and clubs. Even our CEO, Joe Fernandez, piloted a group to success, offering a new take on creating a street team for mobilizing your influencer network to drive change and get the word out utilizing our notification system and circles.

We plan to continue a new tradition of this each quarter, and know that each time the quality of work will be top-notch and find its way into the Klout experience. All of the teams did a fantastic job and showed a diversity of approach and vision while also cohesively rallying around concepts we’ve collectively considered for some time now. We couldn’t be more proud.

Posted in Culture | No Comments »

Scaling the Klout API with Scala, Akka, and Play

October 2nd, 2012 by David Ross

Back in March, Felipe Oliveira wrote about Klout’s new Sexy API. We had just released the Scala Play! Framework API infrastructure that we had been writing the previous few months. Not only did it represent a big step forward on the tech side, but it was also an important cultural change for Klout. Previously, disparate teams were responsible for their own serving infrastructure; now, having a central platform has empowered Klout to scale to a billion API requests per day and export powerful new functionality to partners.

But we still had a lot of work to do back then. By now, six months after launch, we’ve made some serious improvements to the API’s scalability and availability using Akka’s rich toolset for concurrent programming. Though Akka is mostly famous for its implementation of the Actor Model, I’m going to talk about two other Akka features, Futures and Agents.

Scalability with Akka Futures

For some background on the scalability problems we face, consider that serving a simple profile page like mine (see below), requires hundreds of lookups to several different datastores. Because of the scale of Klout’s data pipeline (expect a future blog post by Sreevatsan Raman to shed more light), we need to store users’ Scores, Moments, Topics and other data all in different datastores. As such, our app is very IO bound and optimizing our IO usage was one of our biggest priorities. We needed to do our IO concurrently.


Akka Futures (soon to be part of the Scala Standard Library) have proven to be the ideal tool for concurrent work. A Future represents an asynchronous computation and many Futures can be created in parallel. The Future API in Akka is very rich, but the key for us is its monadic nature. If you don’t know what a monad is, in Scala, it is something that has the map and flatMap methods. This allows Futures to be composed into new Futures with the syntactic sugar of a for expression. Compare this to Java Futures (java.util.concurrent.*), which have no means of composition.

Consider the following example, which has three methods that call different datastores and each return a Future:

Additionally, we have a resulting type we’d like to combine the results into:

Now, how should we do this? The non-monadic way, similar to how we would do it in Java, is to start each of the tasks, wait for them, and then build the result:

This is not ideal for a few reasons:

1. We are blocking on the execution of the concurrent tasks, which means the thread running this code must wait idly, wasting resources while the app is making network IO.
2. It is rather verbose and difficult to maintain.
3. This function violates the Single Responsibility Principle, because it is responsible for both the waiting of the Future and business logic for combining the results.

A better way to do this is with Future composition:

Notice how much more readable the code is. The for expression is sugar for calling the map and flatMap methods on the Futures, and the benefit is that we can refer to the results of the Futures in the yield block without waiting for them. This makes the method read more like a workflow and it is no longer concerned with waiting for the completion of the tasks.

One difference between the two methods is that the first returns a raw Profile and the second returns a Future[Profile]. This leads to an important realization, in the form of simple rules, we had while adding Futures to our code:

1. All methods that do IO should return a Future
2. Never block a thread waiting on a Future
3. Therefore, all methods that call other methods that return Futures must themselves return Futures.

In this way, we use Future almost like an IO monad (see this post for an introduction to functional IO). This allows us to push the Futures all the way up our call stack, finally wrapping them in Play’s AsyncResult in controller methods. Play handles these results in a non-blocking way, so we can be as efficient with IO as possible. (See the Play documentation for more detail).

Overall, the strategy of using Akka Future allows us to write more efficient and more readable code. I suggest becoming very familiar with the methods on Futures and the different ways to compose them, especially since they will be shipped with Scala 2.10 and later. The ability to write concurrent IO so easily is the key to our API’s performance and scalability.

High Availability with Apache Zookeeper and Akka Agents

Another one of our learnings in the last six months is that dynamic service discovery is key. For example, our MySQL cluster has one master and several slaves, and we spread read requests across the slaves as much as possible. Sometimes we need to dynamically remove slave nodes from the pool because of degraded performance or scheduled maintenance. Since we a large production cluster of API nodes, we need a to be able to make updates without re-deploying or downtime, giving our clients the best experience possible and guaranteeing that all nodes update within seconds.

Apache Zookeeper was an obvious solution for distributed configuration. To start, we created a simple wrapper for ZooKeeper on top of Twitter’s ZooKeeper client written in Scala. We use this wrapper to watch ZooKeeper nodes, issuing a callback inside our application whenever a modification happens:

But where should these callbacks go? One solution would be to create a service like this:

This service would keep track of nodes to read from, and we could use the zookeeper client to call updateState on the service. Any read request for MySQL would use the service to determine the pool of nodes to read from.

Again, there are a couple of problems with this:

1. The same class that deals with business logic is responsible for making updates, so the interface is not safe. Clients of this service would be able to make changes when we only want Zookeeper callbacks to make these changes.
2. This service would be prone to concurrency issues when multiple threads are making updates. These issues amplify exponentially if we want to add more features to our “MySQL State” than just “up” or “down”.

At first, we thought that Actors would be a good solution for this problem. However, we soon learned that because actors process all messages one at a time, the reads would get backed up. Also, the interface to the actors is Futures, which is more complex than necessary.

Akka thankfully provides an implementation of Agents, based off of the concept by the same name from Clojure. Agents wrap an instance of some type of state and support asynchronous single-threaded updaters and synchronous getters. For an agent of type T, the updater is a function from T => T, and the agent updates its state by changing its value to the result of the function applied to it’s previous value.

Here’s a similar implementation of the service above but using an Agent:

As you can see, the service is much simpler and the interface hides the updating access. Also, because the update functions are applied one at a time and simply add or remove nodes from the set, there is no concurrency issue. The biggest win, however, is that this allows us to think about our state as an immutable data structure, only responsible for dealing with business logic, and wrap the updating logic in the Agent. This gives us an elegant structure to our code.

We like this pattern so much that we use it wherever we have dynamic configuration depending on Zookeeper. It’s a great abstraction that allows for both more reliable and more readable code. And having this mechanism for dynamic service discovery gives the API fault tolerance and high availability it needs to meet its SLA.

Even if you are not using the Actor Model, Akka provides many tools to improve large concurrent enterprise systems. In some cases, other abstractions are simpler to use and require less boilerplate than Actors. At Klout, we believe in using the right tool for the job, so to help the Klout API meet SLA, we have used Future and Agents heavily. In doing so, we hope to push more and more of Klout’s data into the world. Let us know if you want to help.

Posted in Platform Engineering | No Comments »