Sunday, September 2, 2018

Reading resource files in Spark using Scala

Often while coding up unit tests in Scala, I need to read from a file which is available in the resources folder. There are a few variations to how this can be done, specifically if I am using the contents of the file as DataFrame in Spark. Here are some examples what I want to keep for myself as notes.

All these examples work with JDK 1.8u144, Scala 2.11.8 and Spark 2.3.1. Test them out if your version of software differ substantially from these versions.
  • Get the contents of the text file as Iterator:

    // In the path below, "/" refers to the root of the resources folder
    val fileStream ="/path/to/file.txt"))
    val iterator: Iterator[String] = fileStream.getLines()
    // Print the lines 
    // To get the complete contents of the file in a string
    val contents = fileStream.getLines().mkString("\n")

  • Get a text file as Spark RDD[String], individual lines as rows:

    // In the path below, "/" refers to the root of the resources folder
    val session: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    val fileStream = getClass.getResourceAsStream("/path/to/file.txt")
    val input = ss.sparkContext.makeRDD(
    // Another shorter alternative:
    val input = session.sparkContext.textFile(getClass.getResource("/path/to/file.txt").getPath)

  • Get a text file as a [n x 1] Spark DataFrame with individual lines as rows:

    // In the path below, "/" refers to the root of the resources folder
    val session: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    val input: DataFrame = session.sparkContext.textFile(getClass.getResource("/path/to/file.txt").getPath).toDF("text")

  • Read a line-delimited JSON file into a Spark DataFrame:

    // In the path below, "/" refers to the root of the resources folder
    val session: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    // The below line works when the file is in the resource folder and the program is run
    // through IDE or through command line. However, doesn't work when packaged in a jar.
    // Results in org.apache.spark.sql.AnalysisException: Path does not exist
    val input: DataFrame ="/path/to/json/file").getPath).select("text")
    // The following works when the resource file is packaged in a jar
    import session.implicits._
    val ds = session.createDataset[String](Source.fromInputStream(getClass.getResourceAsStream("/path/to/json/file")).getLines().toSeq)
    val input ="text")

  • The final one is a niche use case where we have a bunch of events in an avro file, and would like to read the events. In this case, we use an iterator style, i.e. we stream the file lazily. For more notes on this, see here.

    // In the path below, "/" refers to the root of the resources folder
    val resourceURI: URI = getClass.getResource("/path/to/events.avro").toURI
    val file: File = Paths.get(resourceURI).toFile
    val datumReader = new SpecificDatumReader[Event]()
    val dataFileReader = new DataFileReader[Event](new File(file.getAbsolutePath), datumReader)
    // DataFileReader behaves like an iterator, i.e., a stream of events.
    // E.g., if we wanted filter a few events to a separate collection:
    def eventFilter(event: Event) = ???
    val specialSet = dataFileReader.withFilter(eventFilter).foldLeft(...)

Update (Sep 16, 2018): Added way to read JSON file in Spark through read.json
Update (Jan 05, 2019): Updated title

Wednesday, December 6, 2017

DisplayLink again: sun-dried adapter, anyone?

I have written about DisplayLink technology before (the USB3.0 adapter I use for projecting my laptop to dual screens), and my tricks on how I get the device working when it doesn't want to work. As it turns out, I was not even on the right track in the other post, and it is pretty hilarious how the discovery went.

Some days back, the Targus DisplayLink adapter (model ACP70US) completely stopped working: however many times I power-cycled, the monitors were not getting detected (however the mouse and keyboard was being detected every time, obviously). Initially I thought that it was because of an Win 10 update that recently got pushed. I tried switching driver versions, but that didn't help either. Desperate, I wrote in the DisplayLink support forum and the helpful admin took me through a few debugging steps.

It turns out that, my docking station has an issue in the USB 3.0 hub. Specifically, the temperature tolerance of the USB3.0 hub used in the docking station is at issue here. When using this docking station from cold condition (e.g., when it has not been used for a few hours etc), or when the ambient temperature is cold, it has (or, is having) trouble in getting started since the temperature is not within tolerance. It seems, this issue was corrected by the manufacturer (Targus) in subsequent revision or batch. Now, this product was originally purchased in US, however I am now located in Bangalore, India. For what its worth, it was actually raining here for last couple of days and definitely a few degrees colder (almost 3-4 degC colder).

Indeed, it was sunny today here in Bangalore, and I baked the device in sun for sometime. Lo and behold - it started working!!

This has to be the coolest trick I have ever encountered with computer peripherals. Oh well!! 

Sunday, March 5, 2017

Using HTCondor for grid computing

For a project I am experimenting with, I wanted to run multiple instances of a program parallely, each instance dealing with different input data. An example to demonstrate the use case: Let’s say I have a folder full of movies in wmv format, all of which I want to convert to mp4 format, using the ffmpeg program. I would like to run as many as these conversion jobs parallely, possibly on multiple computers.

Given my current exposure to big data solutions like Hadoop/Spark, I initially thought that, for e.g., we can use yarn scheduler and run these jobs on a spark cluster. But my research indicated that these use cases are not really served by yarn/spark. These tasks are not cloud computing tasks, but they are grid computing tasks.

Now, given my graduate days in University of Wisconsin, Madison, for me, grid computing means condor (currently called HTCondor). I have used condor for my Ph.D. research, and it was awesome. So, I took a look, and gave it a try. Since I never had to set up a cluster before, here are some notes for setting up a cluster aka grid for these jobs.

Note that, this is not really a Getting Started guide, this doesn’t show, for example, sample jobs or sample submission files. For that purpose, follow the manual.

Machines Setup

Currently using a cluster of 3 VM, each running CentOS 6. Using v8.6.0 of HTCondor, and the manual is available here.

HTCondor Setup

Here are the commands that would install HTCondor on each of these CentOS 6 systems. To be run as root user. The original instruction can be found here.

cd /etc/yum.repos.d/
rpm --import RPM-GPG-KEY-HTCondor
yum install condor-all

  • CentOS <version> is equivalent to RHEL <version>
  • You might have to clear yum cache by using ‘yum clear all’
  • Once the installation is successful, the binaries goes into /usr/bin (/usr becomes the “installation folder”), and the config files go into /etc/condor
  • As part of the installation process, user & group ‘condor’ is created.
Here  are the corresponding commands that would install HTCondor on debian systems (I ended up setting up on my Virtualbox VM for experimenting while on the go, and I run a debian distro there: Bunsen Labs Linux). Again, to be run as root user. The original instruction can be found here.

echo "deb jessie contrib" 
                                                     >> /etc/apt/sources.list
wget -qO - 
                                                     | sudo apt-key add -
apt-get update
apt-get install -t jessie condor

HTCondor Configuration

  • Configurations are available at condor_config in /etc/condor
  • One of the most important gotchas of configuring HTCondor is if the reverse DNS lookup works properly on your hosts, meaning an ‘nslookup <your ip address>’ returns your full host name. If this works, lots of trouble would be saved. Otherwise you have to add additional configs as required.
  • Changed the following configurations (first two because of the above reason):
    • For all boxes in the grid, including CM (Central Manager):
      • condor_config: ALLOW_WRITE = *
    • For all boxes in the grid, including CM (Central Manager), if you want to use it for computations:
      • condor_config.local: TRUST_UID_DOMAIN=true
    • For boxes other than the CM, point to the correct CM IP address, or CM host name if reverse DNS is working.
      • condor_config: CONDOR_HOST = x.x.x.x
    • For Debian executor boxes, blank out the BASE_CGROUP variable. More notes here.
      • condor_config.local: BASE_CGROUP =  
    •  If you do not want the CM box to run jobs, do not run the STARTD daemon. More information here.

Running HTCondor on a Single Node

We will start with running HTCondor on a single node, which will eventually become our central manager.
  • Start the condor daemons:
    • On CentOS 6: /sbin/service condor start
    • On Debian jessi: /etc/init.d/service condor start OR /etc/init.d/condor start
  • Check if the correct processes are running.

[root@... condor]# ps -elf | egrep condor_
5 S condor      4200       1  0  80   0 - 11653 poll_s 14:53 ?        00:00:00 condor_master -pidfile /var/run/condor/
4 S root        4241    4200  0  80   0 -  6379 poll_s 14:53 ?        00:00:00 condor_procd -A /var/run/condor/procd_pipe -L /var/log/condor/ProcLog -R 1000000 -S 60 -C 498
4 S condor      4242    4200  0  80   0 - 11513 poll_s 14:53 ?        00:00:00 condor_shared_port -f
4 S condor      4243    4200  0  80   0 - 16380 poll_s 14:53 ?        00:00:00 condor_collector -f
4 S condor      4244    4200  0  80   0 - 11639 poll_s 14:53 ?        00:00:00 condor_negotiator -f
4 S condor      4245    4200  0  80   0 - 16646 poll_s 14:53 ?        00:00:00 condor_schedd -f
4 S condor      4246    4200  0  80   0 - 11760 poll_s 14:53 ?        00:00:00 condor_startd -f
0 S root        4297    4098  0  80   0 - 25254 pipe_w 14:54 pts/0    00:00:00 egrep condor_

    • Check if condor_status returns correctly, with an output similar to this.

      [root@... condor]# condor_status
      Name      OpSys      Arch   State     Activity LoadAv Mem   ActvtyTim

      slot1@... LINUX      X86_64 Unclaimed Idle      0.080 7975  0+00:00:0
      slot2@... LINUX      X86_64 Unclaimed Idle      0.000 7975  0+00:00:2

                          Machines Owner Claimed Unclaimed Matched Preempting  Drain

             X86_64/LINUX        2     0       0         2       0          0      0

                    Total        2     0       0         2       0          0      0

      • If the condor_status does not return anything, it is probably because the DNS reverse lookup is not working in the system. Please follow the details in this thread, and start with setting the ALLOW_WRITE variable in the condor_config file to *.
      • My CentOS6 VM is dual core with 16GB of RAM, so HTCondor automatically breaks this up into 2 slots, each having 1 core/8GB RAM.

      I directly experimented with a java universe job, but a vanilla universe job can be tried as well. Once the daemons are running and condor_status returns correctly, get back to the user login and submit the job, using ‘condor_submit <job props file>’. If it takes a bit of time to run the job, condor_status would show it.

      [root@akka01-samik condor]# condor_status
      Name      OpSys      Arch   State     Activity LoadAv Mem   ActvtyTim

      slot1@... LINUX      X86_64 Claimed   Busy      0.000 7975  0+00:00:0
      slot2@... LINUX      X86_64 Unclaimed Idle      0.000 7975  0+00:00:2

                          Machines Owner Claimed Unclaimed Matched Preempting  Drain

             X86_64/LINUX        2     0       1         1       0          0      0

                    Total        2     0       1         1       0          0      0

      The command condor_q would show it too.
      [root@... condor]# condor_q

      -- Schedd: ... : <x.x.x.x:42646> @ 02/24/17 14:23:43
      samik.r CMD: java    2/24 07:38      _      1      _      1 2.0

      1 jobs; 0 completed, 0 removed, 0 idle, 1 running, 0 held, 0 suspended

      • Because of the reverse DNS problem, initially when I submitted the job, it was put on hold, and the condor_status and condor_q was showing as much. Here are the steps I followed for debugging:
        • I followed the instructions available here, specifically using the ‘condor_q -analyze <job id>’ command.
        • Looked at the log file StarterLog.slot1
      • Another time, I got a ClassNotFoundException, since the jar file was not available at proper place. This error showed up in the output folder of the job, in the error file (mentioned in the condor submission file). 

      Running HTCondor on multiple nodes

      Once things work for a single work, it is easy to add more nodes to the cluster. Just install the HTCondor package, change the configurations as described, and start the service.

      Notes for running jobs

      Some points that were not very obvious to me even after reading the instruction manual:
      • In vanilla universe jobs, if the executable reads from standard input, use the input command, else use the arguments command. Consider the following snippet:
      executable = testprog
      input = input_file

      For this snippet, condor tries to execute something like: “testprog < input_file”. If this is how testprog is supposed to run, great. Otherwise, this will not work. On the other hand, if what you really want is “testprog input_file”, then the following snippet should work.

      executable = testprog
      should_transfer_files = yes
      # Define input file
      my_input_file     = input_file
      transfer_input_files = $(my_input_file)
      args           = "<other args if required> $(my_input_file)"

      Note that this possibly works a little bit differently in the java universe, where the input file name shows up as an argument to the executable (java).
      • For executables that are supposed to be available at the exact same path on all the nodes in the cluster, it is recommended to mention the path and switch off transferring executable.

      executable     = /path/to/executable
      transfer_executable = false

      Update (March 9, 2017): Added Debian Jessie related commands, and formatting changes.
      Update (April 15, 2017): Added more Debian notes.
      Update (June 3, 2018): Added notes about CM.

      Saturday, December 31, 2016

      Using ssh tunnels for launching hadoop or spark jobs

      This is a rather niche topic, so if you are here, you probably have weighed your options, talked with your colleagues, and have enough reason to do just this. Hence, without further ado, we will get right in the topic.

      As mentioned in another post, I use an ssh double tunnel, and a socks proxy to launch hadoop commands. That works quite well. The commands I use to set up the ssh double tunnel and socks host are:
      • ssh -4 -A -f -N -i "</path/to/keyfile>" username@intermediate-host -L5522:<final_host>:22
      • ssh -f -ND 8020 -i "</path/to/keyfile>" samikr@localhost -p 5522
      Next I can run commands like:
      • hadoop fs -ls hdfs:///user/samik
      Note that, in order to do this, I will have to have the following:
      • Hadoop distribution needs to be available locally
      • The version of local hadoop distribution needs to exactly match the version server-side
      • Appropriate configuration files
      I initially thought that I will extend this method by having the spark distribution locally and launch the spark jobs on the cluster using this. I progressed quite a bit, however hit an insurmountable roadblock in the form of a bug in the spark library. The bug text explains the issue in detail. However, if your remote machine (where you are trying to execute the spark-submit from) can resolve the hadoop namenodes and yarn master, check out this page and try out - it might work for you.

      Instead I had to revert to the painful process of uploading jar every time I was changing something in the program, which is what I wanted to avoid in the first place. But the method is quite clean - here are the steps to achieve this.
        • Create a single jar of your spark code
          • One caveat here: if your dependencies include one or more signed jar, you will have to either manually edit the jar to remove the signature files, or use maven settings to exclude the signature files. Otherwise you will get the following error when you run the code on cluster: Exception in thread "main" java.lang.SecurityException: Invalid signature file digest for Manifest main attributes. Follow this StackOverflow thread for more details. Recommended process here is to use a maven file using the shade plugin.
        • Push the jar to your hadoop user folder
          • This is one way to make the jar available to the YARN executer. 
            • ssh -i "<key file>" samikr@localhost -p 5522 "/usr/bin/hadoop fs -put - hdfs:///user/samikr/<jarname>.jar" < /path/to/jar/file
          • Might have to delete existing jar file.  
            • ssh -i "<key file>" samikr@localhost -p 5522 "/usr/bin/hadoop fs -rm -skipTrash hdfs:///user/samikr/<jarname>.jar"
        • Push the spark-hadoop jar to the hadoop user folder as well. Even better, run a sample spark application (e.g., this page shows running SparkPi in HDP) using spark-submit on the remote host, and note down the path to the spark-hadoop jar that is being used. It should typically already be available somewhere in HDFS. Agin, the reason is to avoid copying this jar every time the job is launched.
        •  Finally, launch the job:
          • Command: 
            ssh -i "<key file>" samikr@localhost -p 5522 "/usr/bin/spark-submit --class <complete.classname.for.ExecutableClass> --master yarn-cluster --driver-memory 4g --executor-memory 2g --executor-cores 1 --conf spark.yarn.jar=hdfs://production1/hdp/apps/x.x.x/spark/spark-hdp-assembly.jar hdfs:///user/samikr/<jarname>.jar"
          • In the above command, “--conf spark.yarn.jar=hdfs://production1/hdp/apps/x.x.x/spark/spark-hdp-assembly.jar” specifies the location of the spark-hadoop jar in HDFS. This part should not be required when the command is being run through an ssh tunnel - check out.

        To launch hadoop jobs (or yarn jobs for that matter), it is a bit straightforward. In this case, I had to just copy the fat jar in the remote node (not in HDFS) and launch the job. Commands were as follows.
        • scp -P5522 -i "<key file>" /path/to/fat.jar samikr@localhost:.
        • ssh -i "<key file>" samikr@localhost -p 5522 "/usr/bin/yarn jar fat.jar <complete.classname.for.ExecutableClass>"
        More notes:
        • In order to avoid the loop of compile-upload-test, it would be better to create a local hadoop-spark node, test out the code there, and then use the above procedure to run the job on the cluster. There are some interesting notes here - possibly a topic for a later post.
        • Using ssh config files are recommended to shorten some of the large ssh commands above.
        Update  (Sep 5, 2018): Added few relevant links.

        Thursday, December 22, 2016

        Using ssh: multiple security algorithms and keys

        While using ssh to connect to hosts, I recently faced this interesting issue. I usually use a double ssh tunnel to connect to varied internal hosts which are behind firewall. Typically the way I set the double tunnel is using the command:

        ssh -4 -A -f -N -i "</path/to/keyfile>" username@intermediate-host -L5522:<final_host>:22

        You will notice that the above command sets up a tunnel which forwards the ssh port of the final host to a local port (5522), so that I can run commands. This works pretty well, and I have used this tunnel to run hadoop commands or submit spark jobs.

        I was recently trying to set up a tunnel to a new host to submit spark jobs. The tunnel setup went well, but when I tried to run a hadoop command over the tunnel, I got a message regarding the key type being used for ssh handshake.

        $ ssh -i "</path/to/keyfile>" samikr@localhost -p 5522 "/usr/bin/hadoop fs -put - hdfs:/
        //user/samikr/datapipes.jar" < datapipes.jar
        Unable to negotiate with no matching host key type found. Their offer: ssh-rsa,ecdsa-sha2-nistp256,ssh-ed25519

        I checked the config file and the contents were as follows.

        $ cat config
        Host *
               HostkeyAlgorithms ssh-dss

        Clearly I needed to add one of the accepted key types for this server, but I faced some trouble specifying multiple keys in the same line. After some searching, this is what worked (note no space after comma).

        $ cat config
        Host *
               HostkeyAlgorithms ssh-dss,ssh-rsa

        Now the command seemed to be going through, but I got another error message.

        Someone could be eavesdropping on you right now (man-in-the-middle attack)!
        It is also possible that a host key has just been changed.
        The fingerprint for the RSA key sent by the remote host is
        Please contact your system administrator.
        Add correct host key in /home/<user>/.ssh/known_hosts to get rid of this message.
        Offending DSA key in /home/<user>/.ssh/known_hosts:5
        RSA host key for [localhost]:5522 has changed and you have requested strict checking.
        Host key verification failed.

        There seems to be already an entry for localhost/5522 in the known_hosts, but for ssh-dss algorithm. I was hoping that another line with the new algorithm would get added in the known_hosts file for localhost, but apparently, with strict checking, only one entry per host is allowed. I had to get rid of that line, and then things went through for the command.