Exploring Dynamic Loading of Custom Filters in HBase

Any program that pulls data from a large HBase table containing terabytes of data spread over many nodes will need to put a bit of thought into the retrieval of this data. Failure to do this may mean waiting for and subsequently processing a lot of unnecessary data, to the point where it renders this program (whether a single-threaded client or a MapReduce job) useless. HBase’s Scan API helps in this aspect. It configures the parameters of the data retrieval, including the columns to include, start and stop rows and batch sizing.
 
The Scan can also include a filter which can be the most impactful
way to improve performance of scans of an HBase table. This filter is applied to a table and screens out unwanted rows from being included in a result set. A well-designed filter is performant and minimizes the data scanned and returned to the client. There are many useful Filters that come standard with Hbase, but sometimes the best solution is to use a custom Filter tuned to your HTable’s schema.

Before your custom filter can be used, it will have to compiled, packaged in a jar, and deployed to all the regionservers. Restarting the HBase cluster is necessary for the regionservers to pick up the code in their classpaths. Therein lies the problem – an HBase restart takes a non-trivial amount of time (although rolling restarts mitigate that somewhat) and the downtime is significant with a cluster as big as Flurry’s.

This is where dynamic filters come in. The word ‘dynamic’ refers to the on-demand loading of these custom filters, just like loading external modules at runtime in a typical application server or web server. In this post, we will explore an approach that makes this possible in the cluster.

How It Works Now
Before we dive into the workings of dynamically loading filters, let’s see how regular custom filters work.

Assuming the custom filter has already been deployed to a jar in the regionservers’ classpath, the client can simply use the filter, e.g. in a full table scan, like this

https://gist.github.com/4227003

This filter will have to be pushed to the regionservers to be run server-side. The sequence of how the custom filter gets replicated on the regionservers is as follows:

  1. The client serializes the Filter class name and its state into a byte stream by calling the CustomFilter’s write(DataOutput) method.
  2. The client directs the byte array to the regionservers that will be part of the scan.
  3. Server-side, the RegionScanner re-creates the Scan, including the filter, using the byte stream. Part of the stream is the filter’s class name, and the default classloader uses this fully qualified name to load the class using Class.forName().
  4. The filter is then instantiated using its empty constructor and configured using the rest of the filter byte array (using the filter’s readFields(DataInput) method (see org.apache.hadoop.hbase.client.Scan for details).
  5. The filter is then applied as a predicate on each row.

(1) myfilter.jar containing our custom filter resides locally in the regionservers’ classpath
(2) the custom filter is instantiated using the default Classloader

Once deployed, this definition of our custom filter is static. We can make an ad hoc query using the combination of filters, but if we need to add, extend or replace a custom filter, it has to be added to the regionserver’s classpath and we have to wait for its next restart before those filters can be used.

There is a faster way.

Dynamic Loading
A key takeaway from the previous section is that Filters are Writables – they are instantiated using the name of the class and then configured by a stream of bytes that the Filter understands. This makes the filter configuration highly customizable and we can use this flexibility to our advantage.

Rather than create a regular Filter, we introduce a ProxyFilter which acts as the extension point through which we can load our custom Filters on demand. During runtime, it will load the custom class filter itself.

Let’s look at some example code. To start with, there is just a small change we have to make on the client; the ProxyFilter now wraps the Filter or FilterList we want to use in our scan.

https://gist.github.com/4227336

The ProxyFilter passes its own class name to be instantiated on the server side, and serializes the custom filter after.

https://gist.github.com/4227434

On the regionserver the ProxyFilter is initialized in the same way as described in the previous section. The byte stream that follows should minimally contain the filter name and its configuration byte array. In the ProxyFilter’s readFields method, the relevant code looks like this.

https://gist.github.com/4227524

This is very much like how the default Scan re-creates the Filter on the regionserver with one critical difference – it uses a filterModule object to obtain the Class definition of the custom filter. This module retrieves the custom filter Class and returns it to ProxyFilter for instantiation and configuration.

There can be different strategies for retrieving the custom filter class. Our example code copies the jar file from the Hadoop filesystem to a local directory and delegates the loading of the Filter classes from this jar to a custom classloader [3].

To configure the location of the directory the module searches for the filters.jar in HDFS, add the following property in hbase-site.xml.

<property>
<name>flurry.hdfs.filter.jar</name>
<value>/flurry/filters.jar</value>
</property>
(1) The custom filter jar resides in a predefined directory in HDFS

(2) The proxyfilter.jar containing the extension point needs to reside locally in the regionserver’s classpath
(3) The ProxyFilter is instantiated using the default ClassLoader
(4) If necessary, the rowfilter.jar is downloaded from a preset Path in HDFS. A custom classloader in ProxyFilter proceeds to instantiate the correct filter. Filter interface calls are then delegated to the enclosed filter.

With the ProxyFilter in place, it is now simply a matter of placing or replacing the jar in the Hadoop FS directory to pick up the latest filters.

Reloading Filters
When a new Scan is requested on the server side, this module first checks up on the filter.jar. If this jar is unchanged, the previously loaded Classes are returned. However, if the jar has been updated, the module repeats the process of downloading it from HDFS, creating a new instance of the classloader and reloading the classes from this modified jar. The previous classloader is dereferenced and left to be garbage collected. Restarting the HBase cluster is not required.

The HdfsJarModule keeps track of the latest custom filter definitions using a separate classloader for the different jar versions

Custom classloading and reloading can be a class-linking, ClassCastException minefield, but the risk here is mitigated by the highly specialized use case of Filtering. The filter is instantiated and configured per scan and its object lifecycle limited to the time it takes to do the scan in the regionserver. The example uses the child-first classloader mentioned in a previous post on ClassLoaders that searches for a configured set of URLs before delegating to its parent classloader [2].

Things to watch out for

  • The example code has a performance overhead as it makes additional calls to HDFS to check for the modification time of the filter jar when a filter is first instantiated. This may be a significant factor for smaller scans. If so, the logic can be changed to check the jar less frequently.
  • The code is also very naïve at this point. Updating the filter.jar in the Hadoop FS while a table scan is happening can have undesired results if the updated filters are not backward compatible. Different versions of the jar can be picked up by the RegionServers for the same scan as they check and instantiate the Filters at different times.
  • Mutable static variables are discouraged in the custom Filter because they will be reinitialized when the class is reloaded dynamically.

Extensions
The example code is just a starting point for more interesting functionality tailored to different use cases. Scans using filters can also be used in MapReduce jobs and coprocessers. A short list of possible ways to extend the code:

  • The most obvious weakness in the example implementation is the ProxyFilter only supports one jar. Extending that to include all jars in a filter directory will be a good start. [4]
  • Different clients may expect certain versions of Filters. Some versioning and bookkeeping logic will be necessary to ensure that the ProxyFilter can serve up the correct filter to each client.
  • Generalize the solution to include MapReduce scenarios that use HBase as the input source. The module can load the custom filters at the start of each map task from the MR job library instead, unloading the filters after the task ends.
  • Support other JVM languages for filtering. We have tried serializing Groovy 1.6 scripts as Filters but performance was several times slower.


Using the Proxyfilter as a generic extension point for custom filters allows us to improve our performance without the hit of restarting our entire HBase cluster.

Footnotes
[1] Class Reloading Basics http://tutorials.jenkov.com/java-reflection/dynamic-class-loading-reloading.html
[2] See our blog post on ClassLoaders for alternate classloader delegation http://tech.flurry.com/using-the-java-classloader-to-write-efficient
[3] The classloader in our example resembles the one described in this ticket https://issues.apache.org/jira/browse/HBASE-1936
[4] A new classloader has been just been introduced in hbase-0.92.2 for coprocessors, and it seems to fit perfectly for our dynamic filters https://issues.apache.org/jira/browse/HBASE-6308
[5] Example code https://github.com/vallancelee/hbase-filter/tree/master/customFilters

Standard

Squashing bugs in multithreaded Android code with CheckThread

Writing correct multithreaded code is difficult, and writing Android apps is no exception. Like many mobile platforms, Android’s UI framework is single threaded and requires the application developer to manage threads with no thread-safe guarantee. If your app is more complicated than “Hello, World!” you can’t escape writing multithreaded code. For example, to build a smooth and responsive UI, you will have to move long running operations like network and disk IO to background threads and then return to the UI thread to update the UI.

Thankfully, Android provides some tools to make this easier such as the AsyncTask utility class and the StrictMode API. You can also use good software development practices such as adhering to strict code style and requiring careful code review of code that involve the UI thread. Unfortunately, these approaches require diligence, are prone to human error, or only catch errors at runtime.

CheckThread for Android

CheckThread is an open source project authored by Joe Conti that provides annotations and a simple static analysis tool for verifying certain contracts in multithreaded programs. It’s not a brand new project and it’s not very difficult to use, but it hasn’t had a very high adoption for Android apps. It offers an automated alternative to exclusively using comments and code review to ensuring no bugs related to the UI thread are introduced in your code. The annotations provided by CheckThread are: @ThreadSafe, @NotThreadSafe, @ThreadConfined

ThreadSafe and NotThreadSafe are described in Java Concurrency in Practice, and CheckThread enforces the same semantics that book defines. For the purposes of this blog post, the only annotation that we’ll be using is ThreadConfined.

Thread confinement is a general property of restricting data or code to access from only a single thread. A data structure confined to the stack is inherently thread confined. A method that is only ever called by a single thread is also thread confined. In Android, updates to the UI must be confined to the UI thread. In very concrete terms, this implies that any method that mutates the state of a View should only be called from the UI thread. If this policy is violated, the Android framework may throw a RuntimeException, but also may simply produce undefined behavior, depending on the specific nature of the update to the UI.

CheckThread supports defining thread policies in XML files, so while it would be possible, it’s not necessary to download the source of the Android framework code and manually add annotations to it. Instead, we can simply define a general thread policy to apply to Android framework classes.

Time for an Example

The following example demonstrates how to declare a thread policy in XML, annotate a simple program and run the CheckThread analyzer to catch a couple of bugs.

CheckThread’s XML syntax supports patterns and wildcards which allows you to concisely define policies for Android framework code. In this example we define two Android specific policies. In general this file would contain more definitions for other Android framework classes and could also contain definitions for your own code.

The first policy declares that all methods in Activity and its subclasses that begin with the prefix “on” should be confined to the main thread. Since CheckThread has no built-in concept of the Android framework or of the Activity class we need to inform the static analyzer which thread will call these methods.

The second policy declares that all methods in classes ending with “View” should be confined to the main thread. The intention is to prevent calling any code that modifies that UI from any other thread than the UI thread. This is a little bit conservative since there are some methods in Android View classes that have no side-effects, but it will do for now.

https://gist.github.com/4113656

Having defined the thread policy, we can turn to our application code. The example app is the rough beginnings of a Hacker News app. It fetches the RSS feed for the front page, parses the titles and displays them in a LinearLayout.

This first version is naive; it does network IO and XML parsing in Activity.onCreate. This error will definitely be caught by StrictMode, and will likely just crash the app on launch, so it would be caught early in development, but it would be even better if it were caught before the app was even run.

https://gist.github.com/4113662

In this code, we make a call to the static method getHttp in the IO utility class. The details of this class and method are not important, but since it does network IO, it should be called from off the UI thread. We’ve annotated the entire class as follows:

https://gist.github.com/4113669

This annotation tells CheckThread that all the methods in this class should be called from the “other” thread.

Finally, we’re ready to run the static analyzer. CheckThread provides several ways to run the analysis tool, including Eclipse and Intellij plugins, but we’ll just use the Ant tasks on the command line. This is what CheckThread outputs after we run the analyzer:

https://gist.github.com/4113676

As you can see, CheckThread reports an error: we’re calling a method that should be confined to the “other” thread from a method that’s confined to “MAIN”. One solution to this problem is to start a new thread to do network IO on. We create an anonymous subclass of java.util.Thread and override run, inside of which we fetch the RSS feed, parse it and update the UI.

https://gist.github.com/4113683

We’ve annotated the run method to be confined to the “other” thread. CheckThread will use this to validate the call to IO.getHttp. After running the analyzer again, we discover that there’s a new error reported:

https://gist.github.com/4113686

This time, the error is caused by calling the method setText on a TextView from a different thread than the UI thread. This error is generated by the combination of our thread policy defined in XML and the annotation on the run method.

Instead, we could call the Activity.runOnUiThread with a new Runnable. The Runnable’s run method is annotated to be confined to the UI thread, since we’re passing it to a framework method that will call it from the UI thread.

https://gist.github.com/4113689

Finally, CheckThread reports no errors to us. Of course that doesn’t mean that the code is bug free, static analysis of any kind has limits. We’ve just gotten some small assurance that the contracts defined in the XML policy and annotations will be held. While this example is simple, and the code we’re analyzing would be greatly simplified by using an AsyncTask, it does demonstrate the class of errors that CheckThread is designed to catch. The complete sample project is published on Github.

The Pros and Cons of Annotations

One drawback that is probably immediately obvious is the need to add annotations to a lot of your code. Specifically, CheckThread’s static analysis is relatively simple, and doesn’t construct a complete call graph of your code. This means that the analyzer will not generate a warning for the code below:

https://gist.github.com/4113695

While this may appear to be a significant problem, it’s my opinion that in practice it is not actually a deal breaker. Java already requires that the programmer write most types in code. This is seen by some as a drawback of Java (and is often cited incorrectly as a drawback of static typing in general). However there are real advantages to annotating code with type signatures, and even proponents of languages with powerful type inference will admit this, since it’s usually recommended to write the type of “top-level” or publicly exported functions even if the compiler can infer the type without any hint.

The annotations that CheckThread uses are similar; they describe an important part of a method’s contract, that is whether it is thread safe or should be confined to a specific thread. Requiring the programmer to write those annotations elevates the challenge of writing correct multithreaded code to be at the forefront of the programmer’s mind, requiring that some thought be put into each method’s contract. The use of automated static analysis makes it less likely that a comment will become stale and describe a method as thread safe when it is not.

The Future of Static Analysis

The good news is that the future of static analysis tools designed to catch multithreaded bugs is looking very bright. A recent paper published by Sai Zhang, Hao Lü, and Michael D. Ernst at the University of Washington describes a more powerful approach to analyzing multithreaded GUI programs. Their work targets Android applications as well as Java programs written using other GUI frameworks. The analyzer described in their paper specifically does construct a complete call graph of the program being analyzed. In addition, it doesn’t require any annotations by the programmer and also addresses the use of reflection in building the call graph, which Android specifically uses to inflate layouts from XML. This work was published only this past summer, and the tool itself is underdocumented at the moment, but I recommend that anyone interested in this area read the paper which outlines their work quite clearly.

 

Standard

Write Once, Compile Twice, Run Anywhere

Many Java developers use a development environment different from the target deployment environment.  At Flurry, we have developers running OS X, Windows, and Linux, and everyone is able to contribute without thinking much about the differences of their particular operating system, or the fact that the code will be deployed on a different OS.

The secret behind this flexibility is how developed the Java toolchain has become. One tool (Eclipse)  in particular has Eclipsed the rest and become the dominant IDE for Java developers. Eclipse is free, with integrations like JUnit support, and a host of really great plugins making it the de facto standard in Java development, displacing IntelliJ and other options.  In fact, entry level developers rarely even think about the compilation step, because Eclipse’s autocompilation keeps your code compiled every time you save a file.

There’s Always a Catch

Unfortunately no technology is magical and while this set up rarely causes issues, it can. One interesting case arises when the developer is using the Eclipse 1.6 compiler compliance and the target environment uses Sun’s 1.6 JDK compiler.  For example at Flurry, during development we rely on Eclipse’s JDT Compiler, but the code we ship gets compiled for deployment on a Jenkins server by Ant using Sun’s JDK compiler. Note that both the developer and continuous integration environment are building for Java 6, but using different compilers. 

Until recently this never came up as an issue as the Eclipse and Sun compilers, even when running on different operating systems, behave almost identically.  However, we have been running into some interesting (and frustrating) compiler issues that are essentially changing “Write Once, Run Anywhere” into “Write Once, Compile Using Your Target JDK, Run Anywhere.”  We have valid 1.6 Java code using generics, which compiles fine under Eclipse, but won’t compile using Sun’s javac.

Let’s See an Example

An example of the code in question is below. Note that it meets the Java specification and should be a valid program. In fact, in Eclipse using Java 1.6 compiler compliance the code compiles, but won’t compile using Sun’s 1.6 JDK javac.

https://gist.github.com/4035987

Compiling this code using javac in the Sun 1.6 JDK gives this compiler error:

https://gist.github.com/4036089

“Write Once, Run Anywhere” never made any promises about using different compilers, but the fact that our toolchain was using a different compiler than our build server never bore much thought until now.

Possible Solutions

The obvious solution is to have all developers work on the same environment as where the code will be deployed, but this would defer developers from using their preferred environment and impact productivity by constraining our development options. Possible solutions we have kicked around :

  1. Have ant compile using the Eclipse incremental compiler, (using flags  -Dbuild.compiler=org.eclipse.jdt.core.JDTCompilerAdapter and of course -Dant.build.javac.target=1.6). This side steps the problem by forcing the continuous integration system to use the same compiler as developer laptops, but is not ideal as this was never an intended use of the Eclipse compiler. 
  2. Move to the 1.7 JDK for compilation, using a target 1.6 bytecode. This solves this particular issue, but what happens in the future?
  3. Change the code to compile under Sun’s JDK. This is not a bad option but will cost some speed of development found in the simplicity of Eclipse’s built in system. 

My experience has been that Eclipse is a well worn path in the Java world, and its a little surprising that this hasn’t come up before for us given the heavy use of generics (although there are lots of generics issues which have been logged over at bugs.sun.com, like http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6302954 which has come up for us as a related issue – the “no unique maximal instance exists” error message). 

Switching to use the Eclipse compiler for our deployable artifacts would be an unorthodox move, and I’m curious if anyone out there reading this has done that, and if so, with what results.

We had a discussion internally and the consensus was that moving to 1.7 for compilation using a target 1.6 bytecode (#2) should work, but would potentially open us up to bugs in 1.7 (and would require upgrading some internal utility machines to support this).  We aren’t quite ready to make the leap to Java 7, although its probably time to start considering the move. 

For now, we coded around the compiler issue, but its coming up for us regularly now, and we are kicking around the ideas on how to resolve.  In the near term, for the projects that run into this generics compile issue, developers are back to using good ole ant to check if their code will “really” compile.  Its easy to forget how convenient autocompilation has become, and the fact that it isn’t really the same as the build server’s compiler.

Standard

Regression Testing NodeJS Services with Restify and Mocha

Here at Flurry we are big proponents of unit testing and TDD (Test-driven Development) – we believe it makes for tighter code that is less prone to behaving in unexpected ways.  We also believe in extensive Regression Testing – making sure the changes and code we add don’t break existing functionality.

Recently we have begun moving parts of the Flurry backend over to a new set of services running on NodeJs.  NodeJS is a fast, flexible application framework that is especially suited to developing RESTful API services.

Restify

To help us build these new backend components we’re using the NodeJS “Restify” framework.  Restify provides an extensive amount of low-level functionality to help you correctly implement RESTful services – much in the same way the “Express” NodeJS module helps you rapidly develop web-based applications – for those of you familiar with Express, you’d feel right at home in Restify.  We’ve found Restify to be a fantastic framework for development – easy to use, understand and implement.

Mocha

To facilitate unit testing, we’re using the Mocha Javascript unit testing framework for everything we do and have found it to be flexible and easy to use.  Mocha makes it really easy to write unit tests for your NodeJS code – so easy in fact, we decided we wanted to use Mocha for our regression testing.

After some trial and error, we have settled on a fairly simple setup, which we have found works well and is easy to implement.  The following steps outline the process, and for this small tutorial we’ll build the requisite “Hello World” API that simple returns “Hello World” when called.

Before we get started, we first want to make sure that both Restify and Mocha are installed for use in our new Node project:

https://gist.github.com/3821529

Once those are installed, we’re ready to create our sample “Hello World” API service, as well as setup the Mocha regression test cases.

Here’s the contents of the app.js file that we will be using for testing:

https://gist.github.com/3821547

You can see that unlike other app.js examples you may have seen, this one is very small and simply makes a function call to StartServer() which has been exported from the start_server.js file.  This function simply starts the server – we’ll cover that below.

Start up the NodeJS service in Mocha

Before we can do any regression testing against our “Hello World” service, we must first start up the Hello World service.  To do this, we are going to create a special “before” Mocha hook – this will run before any of the other regression test files are run by Mocha, and will allow us to start the service so it can be tested.

Within your directory, create a sub directory called “test”.  All of our regression test and unit test files are going to be located inside.  Create a new file called “01-start-server.js” with the following contents:

https://gist.github.com/3821576

This file will be picked up as the first file in the directory (that’s why we started the filename with 01), and the before() function will be executed, which in turn requires and runs our StartServer() function.   The StartServer() function is defined in the start_server.js file:

https://gist.github.com/3821649

It’s purpose is to actually initialize the Restify listeners and start the service.

Create a Mocha regression test

Now that we have a way to automatically start the service before we need to test it, we can go about the business of writing our regression test cases.  Since our Hello World service is so simple, we’re just going to have one test – we’re going to test to be sure our call to the /hello service returns an HTTP response code of “200”, indicating our request was “OK”:

https://gist.github.com/3821607

Initializing an HTTP Client

At the very top of our file that contains the test cases for our Hello World service you can see we are using a feature of Restify – the Restify JSON Client.  The JSON Client is a small wrapper around NodeJS’s http module that makes it easy to call URL based web services that return their content in JSON format.  The JSON client will automatically JSON.parse the response body and make it available for your use (as the “data” parameter in our callback function).

Once we’ve created our client, we can then use the client to make a GET call to our /hello service URL.  If we encounter an error connecting to the service, our “err” parameter will contain the error.  The “data” parameter will contain the data returned from the call, so we will want to check that to be sure it contains the data we requested. 

Running the Regression Test

Now that we have our test in place, the next step is to actually run it, which is as easy as typing “mocha” in your project directory:

https://gist.github.com/3821733

Mocha will first run the “01-start-server.js” file in the test directory – this starts our service.  Next, it will move on to the service_hello_tests.js file and run our solitary regression test.  If the service responds as we have outlined in our test, the test will be marked as passed by Mocha.

Using this simple setup we can add as many additional tests as needed – either extending our “hello” service, or writing additional test cases for new services.

Using Mocha for both unit testing and regression testing allows us to save time by only having to deal with one testing framework – Mocha is flexible enough to make running unit tests in more complex scenarios fairly straight forward and easy to do.  Now if only it could write your unit tests for you 🙂

 

Standard

Scaling Build and Deployment Systems

Flurry is a rapidly growing company in every meaning of the word including customers, teams, and applications.  We have 650 million unique users per month from mobile applications that embed our SDKs which create over 1.6 billion transactions per day.  Our HBase cluster with over 1,000 nodes holds many petabytes of data and is rapidly growing.

While Flurry’s growth is great news for our customers, employees, and investors, it creates challenges for Release Management to support the growing number of applications, the growing number of developers, and the growing number of  development and test environments.  We need to manage all of that well and do so quickly and reliably.

In short, we need continuous integration to rapidly build, deploy, and test our applications.

Deployment Tools @ Flurry
 

To support our continuous integration, we setup three core tools.

  • Source Control: We use Github Enterprise to manage our source code and various configuration files. We use a variation of the Git Flow development process where all features are developed on individual branches and every merge is in the form of a pull request (which is also code reviewed).
  • Continuous Build: We use Jenkins to build code and deploy to our QA and test environments and to run our JUnit tests.  Jenkins is set up to automatically run JUnit tests when code developers check-in code and when they create pull requests for their branches.  Jenkins also runs Selenium tests with SauceLabs every night against the QA environments and Production.
  • Task Tracking: We use Jira (with the Greenhopper agile plugin) for ticket management for planning and tracking enhancements and bug fixes.  All three tools are well integrated with various plug-ins that allow them to share information and to trigger actions.

Challenges at Scale
 
Our setup for continuous integration has served us well but has some challenges.

  • Too Many Jobs: We have more than 50 Jenkins jobs.  We have over 130 deployment scripts and more than 1,600 configuration files for the CI tools and applications.  Each new application and each new QA environment adds to the pile.  While we are whizzes at writing bash scripts and Java programs, this is clearly not scalable in the long term.
  • Slow Deployments: For security reasons, our Jenkins server cannot deploy war files and config files directly to Production servers.  For Production deployments, we run a Jenkins job that copies the files built by other jobs to a server in the data center over a locked-down, one-way secure tunnel.  Ops staff then manually runs various scripts to push the files to the Production servers and restart them.  This is inefficient in terms of time and people resources.
  • Test Overrun: Our JUnit test suite has over 1,000 test cases which take about an hour to run.  With the increase in the number of developers, the number of test runs triggered by their pull requests is clogging the Jenkins build server. We have biweekly releases to Production which we would like to be able to cut down to a daily basis or at least every few days.  The blocker to this is that the build, deploy, test, analyze, and fix cycle is too long to allow that.

Improving the Process: Design for Speed

The speed of an engineering team is directly related to the speed of release and deployment so we needed to get faster. We have taken a number of steps to address the challenges.

  • We optimized our JUnit test cases by removing unnecessary sleep statements and stubbed out the deployments to our test CDN which reduces network wait time.
  • We upgraded the build system to bigger, faster hardware and parallelized the JUnit test runs so that we can run multiple test jobs at the same time.
  • We added dedicated Jenkins slave servers that can share the burden during times of heavy parallel building. 

Overall we have reduced the time to run the entire test suite to 15 minutes.

To make it easier to manage the Jenkins jobs, we removed a lot of old jobs and combined others using parameterized builds.  We renamed the remaining Jenkins jobs to follow a standard naming convention and organized them into tabbed views.  We now have a dozen jobs laid out where people can find them.

Jenkins

Jenkins

All of the improvement steps have helped, but we needed more fundamental changes.

Improving the Process: RPM Based Software Deployments
 

We changed our build and deployment process to use RPM repositories where every environment has its own local repository of RPMs.  In the new process, the Jenkins job builds the war files then bundles up each war file along with its install script.  The job also builds RPMs for each application’s config files, email templates, data files and the config files for HBase and Hadoop.  Once all of the RPMs are built, the job rsyncs the RPMs to the target environment’s repo.  It then runs ssh yum install against each server in the environment to get it to update itself.  Once all the servers are updated, the job restarts all of the servers at once.  The job is parameterized so that users can build and deploy a single app, a set of apps, all apps, or just some config files.

The developers have access to Jenkins so that they can update the QA environments at will without having to involve Release Engineering.

The RPM-based build and deployment process gives us several advantages.  The install scripts are embedded into the RPMs which reduces the cluttered hierarchy of scripts called by the Jenkins jobs.  The same tools and processes for deployments in the Dev and QA environments can now be safely used in Production.

By having a repo for each environment, we only have to deploy the RPMs once to that repo.  Each sever in the environment then pulls the RPMs from its repo.  This save a lot of time and network bandwidth for our remote environments whose servers used to get files directly from Jenkins.

RPMs support dependencies which instruct yum to deploy a group of other RPMs before deploying the given RPM.  For example, we can set an application’s RPM to be dependent of the application’s config file RPM, so that when we install the application, yum automatically installs the config file RPM.  The dependency feature also allows us to set up a parent RPM for each class of server where the parent RPM is dependent on all of the application RPMs that run on that class of server.  We simple execute yum install with the parent RPM, and yum downloads and installs all of the application RPMs and their dependent config file RPMs needed for that server.  In the future we will add dependencies for Java, Jetty, and various OS packages to the parent RPMs.  This will allow us to kick start a new server and fully provision it at the push of a button.

Conclusion

As with any change in process and tools, there were a few gotchas.  The Jenkins slave server was easy to set up, but there were a lot of tools and configurations needed to support our junit test runs that had to be copied from the Jenkins master server.  We also found a few places where the concurrent junit tests runs stepped on common files.

Overall, the changes have sped up and cleaned up our build and deployments.  They have allowed us to better manage what we have and to handle our future growth.

Standard

Hardware Fault Detection and Planning

The Flurry Operations Team handles 4,428 spinning disks across 1107 servers among a team of 6 awesome operations engineers.  Since Flurry is a startup, we don’t have an on-site tech team to handle all of the hardware issues that happen at the datacenter.  As we’ve grown from 400 disks to over 4000, we’ve improved our process of handling servers experiencing disk hardware failures.

The most common hardware alerts we receive are from the “Self-Monitoring, Analysis and Reporting Technology”, better known as a SMART alert.   This tool tests and monitors disks and will detect and report on potential disk issue, hoping to warn the admin before a disastrous disk issue appears.  (Find out more about SMART errors).

Flurry lives and dies by the data stored in our hadoop and hbase cluster, so when a disk issue happens we need to respond quickly and decisively to prevent data loss and/or performance impacts.  We generally find that we receive critical and non-critical alerts on around 1% of active cluster disks each month, not all of which need immediate attention.

Monitoring 400 disks: SMART error detected on host

When we were a humble cluster of 100 servers it was easy to log into a box, gracefully stop the hadoop services, unmount the drive, and start the hadoop daemons back up.  Most of the actionable alerts we saw were High Read Errors or Uncorrectable Sectors, which tend to indicate a potentially fatal disk issues.

Hadoop tends to let the filesystem handle marking the sectors as bad and/or unreadable, forcing a read to occur on another replica.  Hadoop is pretty good about moving the block mapping but it can increase the read latency, and generally degrades the overall performance of the cluster.  Did I already mention that we don’t like performance degradation?

Monitoring 1200 disks: Find those bad drives fast

Img_20110222_153756

Our first datacenter expansion in 2011 consisted of a buildout of an additional 200 servers.  Each server has 4 x 1TB drives which are utilized in the cluster, that’s 800 disks in this buildout.   During pre-production diagnostic tests, we had a 0.5% failure rate of the new disks.  

Once the initial issues were resolved, we deployed the servers into production.  The 200 new servers had an average of 2.67 disks going bad per month for the period before our next data center buildout.  Our original 400 disks started reporting 2 new issues a month.  That’s jumping from 0.3% to 0.6% disk issues a month, possibly degrading due to their age.

Monitoring 2400 disks: Throwing more servers in the mix

Photo

Four months later, we needed to double our infrastructure to handle all of the new data that we were processing for our analytics.  This time, we were adding in 1200 new disks to the cluster with the same amount of issues.  The pre-production diagnostics tests only shook out 0.02% of the bad disks.

At this time, we started seeing our drive SMART checks increasing from <1% to 1.3% failures a month.  This was also during the Holiday App Boom as seen here and here. We were spending too much time ferrying drives back and forth from the office to the datacenter and started questioning our diagnostics, urgency and response of SMART errors, and steps to replace a drive.

Chart_1

Our servers have temperature indicators we started to manually monitor and started noticing the new servers were running around 84°F on idle, which we tend to see higher hardware failure rates.  We started graphing the temperatures and noticed they increased to 89°F as we started to bring servers into production.  There was a lot we needed to do and not enough resources to do it, other than bug the NOCs to come up with strategies to bring us down to 77°F.

Monitoring 4800 disks: Finally some cooling relief

Photo1

10 months later, we once again doubled our infrastructure and migrated all of our servers into a new space where we now benefit from more efficient datacenter cooling.  Where we had an average of 77°F, we were now running between 60°F to 65°F.  Guess what happened to our average monthly SMART errors.  It went down to 0.4% since the move.  There may be several factors at play here:

  1. higher temperatures definitely seemed to contribute to higher failure rates
  2. we had a burn in time for those first 2400 disks
  3. the load across the disks had lightened after such a large expansion

Monitoring N disks: Scripts and processes to automate our busy lives

We’ve also improved our process for taking out servers with SMART alerts by creating a script which smartd will call when there’s an issue.  In order to automate this, we’ve allowed the smartd check to take out servers at will. Modifying the smartd.conf script a bit, we use the check to call our shell script which does a few checks to gracefully stop the hadoop and hbase processes. This spreads out the existing data on the effected disks to healthy servers. We’ve also included a check to make sure the number of servers we take down does not exceed our hadoop HDFS replication factor, which further prevents the increase in the risk of removing multiple replicated blocks of data. Once all is complete, the script will notify the Operations team of the tasks performed or skipped. We have open sourced this script on our Github account here so you can fork and use it yourself.

What about the physical disks? Instead of having an engineer go down and take out disks from each server, we plan on utilizing our Remote Hands to perform that task for us, so we can focus on handling the broader-reaching issues. There were times where we batched up disk collection and engineers would carry 30 drives back to the office (walking barefoot, uphill both ways).  

As always, we’re trying to do things more efficiently.  A few improvements we have in the plans include:

  1. Having the script unmount the bad disk and bring the server back into production.
  2. The script will email Remote Hands with the server, disk, location and issue, for them to swap the bad drive.
  3. Once the disk is swapped, mount the new drive and return the server into production.
  4. Adapting the script to handle other hardware alerts/issues (network cards, cables, memory, mainboard)

We’ve learned from those grueling earlier days, and continue to make hardware management a priority.  With a small team managing a large cluster, it’s important to lean on automating simple, repetitive tasks as well as utilizing the services you are already paying for.  I, for one, welcome our new robotic overlords.

Standard

What is Hadoop? Part I

Prologue: The Blind Men and the Elephant
As members of the Hadoop community, we here at Flurry have noticed that at many Hadoop-related gatherings, it is not atypical for about half of the attendees to be completely new to Hadoop. Perhaps someone asked them “Should we use Hadoop for this?”, and they didn’t know how to answer. Maybe they had heard of Hadoop and planned to research it, but never found the time to follow up. Out in the Hadoop community at large, some of the answers I have heard to the question “What is Hadoop?” include the comparison of Hadoop to a tape drive, and that the most important thing about Hadoop is that it is an accelerator for Moore’s Law.

I am reminded of the parable of the blind men and the elephant (which here is named Hadoop). Perhaps you have heard some variation of the story: five blind men come upon an elephant. Each touches a different part of the elephant and so describes the elephant in a manner that is partially correct yet misses the whole picture. One man touches the elephant’s leg – “An elephant is like a tree trunk” – another touches the elephant’s tail – “An elephant is like a snake”, and so on. In the end none of the men agree as they all see only parts of the whole.

Keep in mind that this is just one blind man’s description of the elephant as I answer “What is Hadoop?”

What is Hadoop?
Hadoop is a software library for distributed computing (using the map/reduce paradigm). Hence, if you have a distributed computation problem, then Hadoop may be your best solution. There are a wide variety of distributed computing problems; how do you know if Hadoop meets your needs?

While there are many possible answers to this question based on a variety of criteria, this post explores the following qualified answer to this question: “If the problem you are trying to solve is amenable to treatment with the Map/Reduce programming model, then you should consider using Hadoop. Our first stop then, is the question, “What is Map Reduce?”.

Ok, What is Map/Reduce?
Map/Reduce (or MapReduce, etc.) is a programming strategy/model for distributed computing that was first detailed in a research paper published by Google in 2004. The basic strategy is to assemble a variable number of computing nodes which run jobs in two steps. The structure of the data inputs and outputs to each step, along with the communication restrictions imposed on the steps, enable a simple programming model that is easily distributed.

The first step, Map, can be run completely independently on many nodes. A set of input data is divided up into parts (one for each node) which are mapped from the input to some results space. The second step, Reduce, involves aggregating these results from the independent processing jobs into the final result.

Classic examples of Map/Reduce style jobs include evaluating large aggregate functions and counting word frequencies in a large text corpus. In the former, data is partitioned and partial sums are calculated (Map), and then globally aggregated (Reduce). In the latter, data is partitioned and words are counted (Map), then summed (Reduce).

Map operations take as input a set of key/value pairs and produce as output a list of key/value pairs. The input and output keys and values may (and generally do) differ in type. An important fact to notice is that, at the end of the Map step, there may be values associated with a single output key living on multiple Map compute nodes. In the Reduce step, all values with the same output key are passed to the same reducer (independent of which Map node they came from). Note that this may require moving intermediate results between nodes prior to the Reduce step; however, this is the only inter-node communication step in Map/Reduce. The Reduce step then takes as input a map-output key and a list of values associated with that key, and produces as output a list of resultant values.

To clarify, all Map tasks are independent of each other. All Reduce tasks are independent of each other. It is possible to have no-op tasks for either the Map or Reduce phase.

What does this have to do with Hadoop?
Hadoop brings together a framework that is specially designed for Map/Reduce computing, from the data storage up. The two foundational components of this framework are the Hadoop Distributed File System and the Hadoop Map/Reduce framework. These are probably the two parts of Hadoop that most people have heard the most about, and most of the rest of the framework is built on top of them.

The Hadoop Distributed File System (HDFS)
HDFS is the foundation of much of the performance benefit and simplicity of Hadoop. It is arguably also the component (technically an Apache sub-project) of Hadoop that is most responsible for Hadoop’s overall association with “big data”.

HDFS allows for reliable storage of very large files across machines in a cluster comprised of commodity computing nodes. HDFS is a special-purpose file system and as such is not POSIX-compliant. For example, files, once written, cannot be modified. Files in HDFS are stored as a sequence of replicated blocks, all of which are the same size except the last one. The default block size is 64MB, but the block size and replication factor can be set at the file level.

HDFS has a master/slave architecture. The master node is called the Namenode, and it manages all of the filesystem metadata (names, permissions, and block locations). The Namenode hosts an interface for file system operations and determines the mapping of blocks to the slaves, called Datanodes.

There can be one Datanode per node in the Hadoop cluster, and they manage local data block storage. The Datanodes serve data reads and writes directly, and they create, delete, and replicate blocks in response to commands from the Namenode.

When files have been written to HDFS, their blocks are spread over the Datanodes of the Hadoop cluster. Hadoop achieves high-performance parallel computing by taking the computation to the data on the Datanodes.

Hadoop Map/Reduce

Hadoop achieves high performance for parallel computation by dividing up Map tasks so that individual Map tasks can operate on data that is local to the node on which they are running. In simple terms, this means that the Map task runs on the same server that is running a DataNode that stores the data to be mapped. The overall number of Map tasks is determined by the number of blocks taken up by the input data.

A Map/Reduce job generally proceeds as follows:

  1. Split the input files and prepare input key/values from them.
  2. Run one Map task per input split unit. There is no communication between Map tasks.
  3. “Shuffle”: Group the output according to output keys and distribute to nodes that will run Reduce task(s). This is the only communication step.
  4. Run Reduce tasks.
  5. Write Output

Like HDFS, Hadoop Map/Reduce has a master/slave architecture. The master node is called the Jobtracker, and the slave nodes are called Tasktrackers. The Jobtracker distributes Map and Reduce tasks to Tasktrackers. Tasktrackers execute the Map and Reduce tasks and handle the communication tasks required to move data around between the Map and Reduce phases.

Specifically, for Hadoop Map/Reduce, jobs proceed as follows:

  1. Files are loaded from HDFS
  2. An implementation of the Hadoop interface InputFormat determines which files will be read for input. It breaks files into pieces which are implementations of the Hadoop interface InputSplit (typically FileSplit instances). A very large file may be split into several such pieces. A single Map task is created per split.
  3. Each Map task generates input key-value pairs by reading its FileSplit using the  RecordReader instance provided by the InputFormat.
  4. Each Map task performs computation on the input key-value pairs. There is no communication between Map tasks.
  5. Each Map task calls OutputCollector.collect to forward its results to the Reduce task(s).
  6. (Optional) Run a Combiner on the Map node – this is a lot like a Reduce step, but it only runs on the data on one node. It is an optimization.
  7. Shuffle: outputs from Map tasks are moved to Reducers. There can be user-controlled via a custom Partitioner class.
  8. Sort: the map results are sorted before passing to the Reduce step.
  9. The Reduce step is invoked. The reducer recieves a key and an iterator over all map-output values associated with the key.
  10. Reduce invokes OutputCollector.collect to send final values to the output file.
  11. A RecordWriter instance outputs the results to files in HDFS.

Note here that the storage architecture of HDFS drives the parallelization of the Map/Reduce job. The alignment of the data distribution and computing distribution enables the work in the Map phase to run locally to the data, which in turn makes Hadoop perform very well for this type of work. Note further that Hadoop Map/Reduce does not require that its input files reside in HDFS, nor is HDFS only usable by Map/Reduce programs. The two combine to produce a powerful and simple mode for parallel computing.

Example
Now let’s put it all together with a simple example of a Hadoop MapReduce job. Both the Map and Reduce tasks are shown below:

https://gist.github.com/3444669

Those of you familiar with the Hadoop word count example will recognize that this code is similar. This example expects input files that are lists of IP addresses, one per line. It processes all the files in the (specified) input directory in HDFS using FileInputFormat. The Map task parses each line (passed in as input values) into the four numbers of an IPv4 address, and checks to see if the address is in the major IP address block for Vatican City. If the address is in the Vatican City IP block, the Mapper outputs the IP address as the output key and 1 as the output value. The Reducer receives all of the Map output values for a given IP address in the Vatican City IP block and sums up the counts for each.

Conclusion
HDFS and Hadoop Map/Reduce combine in a supportive architecture to form a foundation for high-performance parallel computing on clusters of commodity machines. If you are trying to decide whether or not to use Hadoop for a project, asking yourself “Can we use Map/Reduce to attack this problem” is a fruitful line of inquiry that can provide a compelling and defensible answer.

Epilogue: the Elephants and the Blind Man
Five elephants come across a blind man. The first elephant touches the man and declares, “Men are very flat”. All the other elephants agree.

Standard