Tuesday, August 5, 2014

Using Apache Crunch with own avro files

I have been working in big data and machine learning for a while now after moving on from Oracle Crystal Ball. Recently I started using/learning Apache Crunch for constructing and analyzing our data pipelines. If you are in big data, but have not heard about Crunch before - check it out. If you are familiar with Cascading, this is a very similar project, with certain differences.

We store our data (which are different type of events) in HDFS using avro format. My 'Hello World' task to teach myself was to read data from HDFS consisting of a bunch of avro files, and print out the count of events in those files. I started out following the example in the code mentioned in this blog post from Cloudera. I faced couple of stumbling blocks while getting to my objective, and this post details the way I resolved them. 

Note: This post is written from the perspective of CDH3u6 dist.

One of the aims I had was that I wanted to run my code directly, not by passing the jar to hadoop (as in, hadoop jar MyProgram.jar ...). The reason for this was that I want to, at some point, throw in a UI for easy use. We will see if I face additional issues with that, but that is the motivation anyways.

First off, our HDFS namenode can only be connected through a double ssh tunnel form my machine. Agreed, this has nothing to do with Apache Crunch, but I had to embed the configurations in the program itself. The double ssh tunnel essentially transfers the namenode to a local port. If you want to know the details of how to do this, follow this link. Here is how the configuration part of my program looks like, to use the tunnel for running the Crunch job.
    public static void main(String[] args) throws Exception
    {
        // Create a configuration object to load specific configurations.
        Configuration conf = new Configuration();
        conf.set("hadoop.socks.server", "localhost:8020");
    conf.set("hadoop.rpc.socket.factory.class.default", "org.apache.hadoop.net.SocksSocketFactory");
        conf.set("fs.default.name", "hdfs://namenode.xxx.com:8020");
        conf.set("hadoop.tmp.dir", "/var/tmp/hadoop");
        // Run the tool.        ToolRunner.run(conf, new DataLoader(), args);
    }
Moving on to the next bummer: the first time I tried to compile and run my program, I got the following exception:
Exception in thread "main" org.apache.hadoop.ipc.RPC$VersionMismatch: Protocol org.apache.hadoop.hdfs.protocol.ClientProtocol version mismatch. (client = 61, server = 63)
at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:401)
at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379)
at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:118)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:222)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:187)
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:89)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1328)
There were a few more lines before the above exception, showing another exception: but this exception was listed as the root cause. Searching the Internet with the error statement yielded the following reason: Hadoop server version that I am using is newer than the Hadoop client libraries that one or some of the dependencies are using. A simple solution is suggested in both this link and this link: specify the hadoop libraries in the program classpath and move them at the top of the classpath stack. I had a local hadoop client set up which I was using for tests, and it was easy to set up the classpath to do this. Unfortunately, In IntelliJ, the editor I use, the procedure changes almost every version!! Here is a link to do this for version 13. 

Even after doing this, I got another exception: [Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/thirdparty/guava/common/collect/LinkedListMultimap]. This was resolved by following this link and adding the missing jar file in the same library group as above.

That was that, got the expected output count. Hello Crunch!!

Update [8th Sept, 2014]
Before I could do any more cool stuff, our cluster got upgraded to CDH5 and I had to set up my stuff all over again. This time it was much easier though. Here are the notes:
  • Had to upgrade the pom.xml for pulling in the new maven repository. Note that the hadoop website doesn't contain any information about the maven repositories, but a search yielded the following website which contains all the information: http://mvnrepository.com/artifact/org.apache.hadoop
    • This version of hadoop doesn't contain the hadoop-core artifact that I was using for CDH3 (which was hadoop v0.20.2). I made the reference to artifact "hadoop-client", with version 2.5.0 (the latest as of this writing).
    • Also, the Crunch version had to be upgraded to work with hadoop 2.x: used "crunch-core" version 0.10.0-hadoop2.
  • Changed the namenode information in java code.
  • Removed the classpath reference from the local hadoop client (which I have also upgraded to use CDH5). No RPC error this time.
    • Initially I forgot to remove the reference to local hadoop jars (as mentioned in the original post), which was still referencing hadoop 0.20.2 libraries. I was getting the following error then: "Server IPC version 9 cannot communicate with client version 4"