Spark Interview Preparation

Q1. Write a spark core program to find dept wise number of employees (count) and Expense on salaries ( sum of amount ) for a given csv files of emp and dept information ( or given two rdd emp and dept ) .

package com.vgtst.spark

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD._


object RddJoin {
 def main(args: Array[String]) = {

    //Start the Spark context
    val conf = new SparkConf().setAppName("Rdd Join").setMaster("local")
    val sc = new SparkContext(conf)
     // Emp id, name,age and sal
    // Result should be Dept wise emp count and total expense on salary with
    // Dept no, dept name,emp count in dept and total of salary per dept
    val emp = sc.parallelize(Seq((1,"Mukesh",10,100), (2,"Rajesh",20,200), (3,"Mahesh",30,300), (4,"Suresh",35,350), (5,"Rakesh",30,300)))
    val dept = sc.parallelize(Seq(("hadoop",10), ("spark",20), ("hive",30), ("sqoop",40)))
 
    // third field the Key for the emp RDD
    val key_emp = emp.keyBy(t => t._3)
 
    // the second field  the Key for dept RDD
    val key_dept = dept.keyBy(t => t._2)
 
    // Inner Join
    val join_data = key_emp.join(key_dept)
    // We can use other joins as may question changes.
    // Left Outer Join--emp.leftOuterJoin(dept)
    // Right Outer Join--emp.rightOuterJoin(dept)
    // Full Outer Join--emp.fullOuterJoin(dept)
 
    // Formatting the Joined Data for better understandable (using map)
    val clean_joined_data = join_data.map(t => (t._1, t._2._2._1,t._2._1._1, t._2._1._2, t._2._1._4))
    val clean_joined_keydata = clean_joined_data.keyBy(t => t._1)
    val cnt= clean_joined_data.map(t=> (t._1 , 1)).reduceByKey(_ + _).keyBy(t => t._1)
    val amt= clean_joined_data.map(t=> (t._1 , t._5.toInt)).reduceByKey(_+_).keyBy(k => k._1)
 
    val out_data = cnt.join(amt)
    val result_data_rdd = clean_joined_keydata.join(out_data)
    val result_data = result_data_rdd.map(t => (t._2._1._1, t._2._1._2, t._2._2._1._2, t._2._2._2._2))
    result_data.foreach(println)

    //Stop the Spark context
    sc.stop
  }

}
Using DataFrames:

package com.vgtst.spark

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.functions.col

object JoinDF {
  case class employee( eid: Int, name:String, deptid: Int, salary : Int)
  case class department( did: Int, dname:String)
  def main(args: Array[String]) = {
     //Start the Spark context
    val conf = new SparkConf().setAppName("JoinDF").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
   import sqlContext.implicits._
   val emp = sqlContext.createDataFrame(
   employee(1,"Mukesh",10,100) :: employee(2,"Rajesh",20,200) :: employee(4,"Suresh",35,350)
   :: employee(5,"Rakesh",30,300) :: employee(3,"Mahesh",30,300):: Nil)

   val dept = sqlContext.createDataFrame( department(10,"hadoop") ::department(20,"spark") ::
       department(30,"hive") :: department(40,"sqoop") ::Nil)
   val Temp = emp.as("TabEmp")
   val Tdept = dept.as("TabDept")

   val joined_df = Temp.join(Tdept, col("TabEmp.deptid") === col("TabDept.did"), "inner").select(col("TabEmp.eid"),col("TabEmp.name"),col("TabEmp.salary"),col("TabDept.did").alias("did"))
   val grp=joined_df.groupBy("did").count().select(col("did").alias("cdid"), col("count").alias("empcnt"))
   val j_df = joined_df.as("jdf")
   val C_grp = grp.as("Cgrp")
 
   val grp2=joined_df.groupBy("did").sum("salary").select(col("did").alias("sdid"), col("sum(salary)").alias("Expense"))
   val S_grp = grp2.as("Sgrp")
   // Spread sheet or Analytical
   //val res_df = j_df.join(C_grp, col("jdf.did") === col("Cgrp.cdid"), "inner").select(col("jdf.eid"),col("jdf.name"),col("jdf.salary"),col("jdf.did").alias("deptid"),col("Cgrp.empcnt"))
   //                 .join(S_grp,col("deptid") === col("Sgrp.sdid"), "inner").select(col("deptid"),col("empcnt"),col("Expense"))
 
   val res_df =C_grp.join(S_grp,col("cdid") === col("sdid"), "inner").select(col("cdid").alias("Deptid"),col("empcnt"),col("Expense"))
   res_df.show()

 }
}

1. Advantages of spark over Mapreduce
   

Map Reduce Spark
In MapReduce, the highest-level unit of computation is a job. A job loads data, applies a map function, shuffles it,
applies a reduce function, and writes data back outto persistentstorage.
In Spark,the highest-level unit of computation is an application. A Spark application can be used for a single batch job, an interactive session with multiple jobs, or a long-lived server continually satisfying requests.
MapReduce starts a process for each task. In contrast, a Spark application can have processes running on its behalf even when it's not running a job.
Slow task startup time extremely fast task startup time
persistent data storage in-memory data storage
Computation some times spill over to the hard drives of local node in-memory computation


2. Text file processing in spark

scala> val textFile = sc.textFile("README.md")
textFile: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:27

scala> textFile.count()
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://abcd101.mydomain.my-cloud.com:8020/user/abc/README.md

-----This is due to late binding  ------------

scala> val textFile = sc.textFile("/user/hive/warehouse/movie_bucket/000000_0")
textFile: org.apache.spark.rdd.RDD[String] = /user/hive/warehouse/movie_bucket/000000_0 MapPartitionsRDD[3] at textFile at <console>:27

scala> textFile.count()    // Number of items in this Dataset
res1: Long = 1058

scala> textFile.first()    // First item in this Dataset
res2: String = 1        Toy Story       1995

scala> val linesWith1990 = textFile.filter(line => line.contains("1990"))
linesWith1990: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at filter at <console>:29

scala> linesWith1990.count()  // Movies released in 1990 in the given partition
res3: Long = 73

scala> textFile.filter(line => line.contains("1990")).count()  // verify direct
res4: Long = 73

scala> exit
---------------------------- move all the 1990 released movies to a text file  -------------


scala> linesWith1990.partitions.length
res1: Int = 2

scala> linesWith1990.map(_.split("\t")(1)).collect
res1: Array[String] = Array(Sorority House Massacre II, Slumber Party Massacre III, The, Air America, House Party, Hot Spot, The, Hamlet, Long Walk Home, The, Bird on a Wire, Predator 2, Flatliners, Puppet Master II, All the Vermeers in New York, Misery, Jacob's Ladder, Teenage Mutant Ninja Turtles, Creature Comforts, Mo' Better Blues, Pacific Heights, Two Jakes, The, Bonfire of the Vanities, Come See the Paradise, Awakenings, Stanley & Iris, Repossessed, Bride of Re-Animator, Robocop 2, Guardian, The, Total Recall, Problem Child, Arachnophobia, Dick Tracy, Leatherface: Texas Chainsaw Massacre III, Gate II: Trespassers, The, Rocky V, Heart Condition, Tales from the Darkside: The Movie, Edward Scissorhands, Sheltering Sky, The, Men Don't Leave, My Blue Heaven, NeverEnding Story II: The N...

scala> linesWith1990.saveAsTextFile("/home/abc/mukesh/movies1990.txt") 
// Remember the original set wont change so it will have year info as well.

------------------------  Testing on the HDFS -----------------
[abc@abcd101 mukesh]$ hdfs dfs -ls /home/abc/mukesh/movies1990.txt
Found 3 items
-rw-r--r--   2 abc supergroup          0 2019-01-19 11:22 /home/abc/mukesh/movies1990.txt/_SUCCESS
-rw-r--r--   2 abc supergroup       1297 2019-01-19 11:22 /home/abc/mukesh/movies1990.txt/part-00000
-rw-r--r--   2 abc supergroup        759 2019-01-19 11:22 /home/abc/mukesh/movies1990.txt/part-00001

_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_

Test case 2 for 1995 movies:

scala> val textFile = sc.textFile("/user/hive/warehouse/movie_bucket/000000_0")
textFile: org.apache.spark.rdd.RDD[String] = /user/hive/warehouse/movie_bucket/000000_0 MapPartitionsRDD[1] at textFile at <console>:27

scala> val linesWith1995 = textFile.filter(line => line.contains("1995"))
linesWith1995: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at filter at <console>:29

scala> linesWith1995.partitions.length
res3: Int = 2

scala> linesWith1995.first()
res4: String = 1        Toy Story       1995

scala> linesWith1995.count()
res5: Long = 321

scala> linesWith1995.map(_.split("\t")(1)).repartition(1).saveAsTextFile("/home/abc/mukesh/movies1995")

scala> linesWith1995.first()                      // Remember the original set wont change
res7: String = 1        Toy Story       1995

------------------------  Testing on the HDFS -----------------
[abc@abcd101 mukesh]$ hdfs dfs -ls /home/abc/mukesh/movies1995
Found 2 items
-rw-r--r--   2 abc supergroup          0 2019-01-19 11:32 /home/abc/mukesh/movies1995/_SUCCESS
-rw-r--r--   2 abc supergroup       5403 2019-01-19 11:32 /home/abc/mukesh/movies1995/part-00000
[abc@abcd101 mukesh]$ hadoop fs -cat /home/abc/mukesh/movies1995/part-00000 | wc -l
321
[abc@abcd101 mukesh]$ hadoop fs -cat /home/abc/mukesh/movies1995/part-00000 | tail
American President, The
GoldenEye
Sudden Death
Tom and Huck

3. Loading and working with files ( sequencial and Text files )

scala> val movie= sc.textFile("/home/abc/mukesh/movies1990.txt/part-00000")
movie: org.apache.spark.rdd.RDD[String] = /home/abc/mukesh/movies1990.txt/part-00000 MapPartitionsRDD[1] at textFile at <console>:27

scala> movie.first()
res0: String = 3942     Sorority House Massacre II      1990

scala> movie.take(5)
res1: Array[String] = Array(3942        Sorority House Massacre II      1990, 3940      Slumber Party Massacre III, The 1990, 3841      Air America     1990, 3773     House Party      1990, 3765      Hot Spot, The   1990)

scala> val moviepairs:org.apache.spark.rdd.RDD[(String,String)] = movie.map(x=>x.split("\t")).map(x => (x(0).toString(),x(1).toString()))
moviepairs: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[3] at map at <console>:29

scala> moviepairs.first()
res2: (String, String) = (3942,Sorority House Massacre II)

scala> moviepairs.saveAsSequenceFile("/home/abc/mukesh/movieSeq")

scala> val movietxt=sc.sequenceFile("/home/abc/mukesh/movieSeq")



[abc@abcd101 mukesh]$ hdfs dfs -ls /home/abc/mukesh/movieSeq
Found 3 items
-rw-r--r--   2 abc supergroup          0 2019-01-24 02:38 /home/abc/mukesh/movieSeq/_SUCCESS
-rw-r--r--   2 abc supergroup        832 2019-01-24 02:38 /home/abc/mukesh/movieSeq/part-00000
-rw-r--r--   2 abc supergroup        756 2019-01-24 02:38 /home/abc/mukesh/movieSeq/part-00001
[abc@abcd101 mukesh]$

scala>  val movietxt:org.apache.spark.rdd.RDD[(String,String)]=sc.sequenceFile("/home/abc/mukesh/movieSeq")
movietxt: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[6] at sequenceFile at <console>:27

scala> movietxt.take(3)
res4: Array[(String, String)] = Array((3942,Sorority House Massacre II), (3940,Slumber Party Massacre III, The), (3841,Air America))

scala> movietxt.saveAsTextFile("/home/abc/mukesh/movietxt1990.txt")


[abc@abcd101 mukesh]$ hdfs dfs -cat /home/abc/mukesh/movietxt1990.txt/part-00000 | tail -5
(3106,Come See the Paradise)
(3105,Awakenings)
(3103,Stanley & Iris)
(3031,Repossessed)
(3013,Bride of Re-Animator)




No comments:

Post a Comment