|
Created by Nilesh Patel
almost 7 years ago
|
|
// JOINS ===================================== Using RDD //emp table id,name,age 1,Vinayak, 35 2,Nilesh, 37 3,Raju, 30 4,Karthik, 28 5,Shreshta,1 6,Siddhish, 2 val emp = sc.textFile("C:/Users/Redirection/pateln7/Downloads/SPARK/emp.txt") val first = emp.first val empfile = emp.filter(x=>x!=first) val empmap=empfile.map(x=>{ val splitted=x.split(",") val id=splitted(0).trim.toInt val name=splitted(1).trim val age=splitted(2).trim.toInt (id,(name,age)) }) empmap.foreach(println) val output = empmap.filter(x=>x._2._2>30) //payroll table id,dept,sal 1,dev, 3500 2,dev, 3700 3,qa, 3000 4,qa, 2800 5,prod,1000 6,prod, 2000 val payroll = sc.textFile("C:/Users/Redirection/pateln7/Downloads/SPARK/payroll.txt") val first = payroll.first val payrollfile = payroll.filter(x=>x!=first) val payrollmap=payrollfile.map(x=>{ val splitted=x.split(",") val id=splitted(0).trim.toInt val dept=splitted(1).trim val sal=splitted(2).trim.toInt (id,(dept,sal)) }) payrollmap.foreach(println) val output = payrollmap.filter(x=>x._2._2>2000) (1,(dev,3500)) (2,(dev,3700)) (4,(qa,2800)) (3,(qa,3000)) val joined=empmap.join(payrollmap) joined.foreach(println) (1,((Vinayak,35),(dev,3500))) (3,((Raju,30),(qa,3000))) (5,((Shreshta,1),(prod,1000))) (4,((Karthik,28),(qa,2800))) (6,((Siddhish,2),(prod,2000))) (2,((Nilesh,37),(dev,3700))) age > 30 and sal > 2000 val output=joined.filter(x=>x._2._1._2>30 && x._2._2._2>2000) Using Dataframe id,name,age 1,Vinayak, 35 2,Nilesh, 37 3,Raju, 30 4,Karthik, 28 5,Shreshta,1 6,Siddhish, 2 val emp = sc.textFile("C:/Users/Redirection/pateln7/Downloads/SPARK/emp.txt") val first = emp.first val empfile = emp.filter(x=>x!=first) val empmap=empfile.map(x=>{ val splitted=x.split(",") val id=splitted(0).trim.toInt val name=splitted(1).trim val age=splitted(2).trim.toInt (id,name,age) }) empmap.foreach(println) val empdf = empmap.toDF("id","name","age") //payroll table id,dept,sal 1,dev, 3500 2,dev, 3700 3,qa, 3000 4,qa, 2800 5,prod,1000 6,prod, 2000 val payroll = sc.textFile("C:/Users/Redirection/pateln7/Downloads/SPARK/payroll.txt") val first = payroll.first val payrollfile = payroll.filter(x=>x!=first) val payrollmap=payrollfile.map(x=>{ val splitted=x.split(",") val id=splitted(0).trim.toInt val dept=splitted(1).trim val sal=splitted(2).trim.toInt (id,dept,sal) }) payrollmap.foreach(println) val payrolldf = payrollmap.toDF("id","dept","sal") scala> val dfjoined = empdf.join(payrolldf, empdf("id")===payrolldf("id")) dfjoined: org.apache.spark.sql.DataFrame = [id: int, name: string ... 4 more fields] scala> dfjoined.show +---+--------+---+---+----+----+ | id| name|age| id|dept| sal| +---+--------+---+---+----+----+ | 1| Vinayak| 35| 1| dev|3500| | 6|Siddhish| 2| 6|prod|2000| | 3| Raju| 30| 3| qa|3000| | 5|Shreshta| 1| 5|prod|1000| | 4| Karthik| 28| 4| qa|2800| | 2| Nilesh| 37| 2| dev|3700| +---+--------+---+---+----+----+ Using sqlcontext val dfjoined = empdf.join(payrolldf, empdf("id")===payrolldf("id")) dfjoined.registerTempTable("joinedemp") val sqlContext = dfjoined.sqlContext val output = sqlContext.sql("select * from joinedemp where age>30 and sal> 3500").show //
Want to create your own Notes for free with GoConqr? Learn more.