// JOINS =====================================
Using RDD
//emp table
id,name,age
1,Vinayak, 35
2,Nilesh, 37
3,Raju, 30
4,Karthik, 28
5,Shreshta,1
6,Siddhish, 2
val emp = sc.textFile("C:/Users/Redirection/pateln7/Downloads/SPARK/emp.txt")
val first = emp.first
val empfile = emp.filter(x=>x!=first)
val empmap=empfile.map(x=>{
val splitted=x.split(",")
val id=splitted(0).trim.toInt
val name=splitted(1).trim
val age=splitted(2).trim.toInt
(id,(name,age))
})
empmap.foreach(println)
val output = empmap.filter(x=>x._2._2>30)
//payroll table
id,dept,sal
1,dev, 3500
2,dev, 3700
3,qa, 3000
4,qa, 2800
5,prod,1000
6,prod, 2000
val payroll = sc.textFile("C:/Users/Redirection/pateln7/Downloads/SPARK/payroll.txt")
val first = payroll.first
val payrollfile = payroll.filter(x=>x!=first)
val payrollmap=payrollfile.map(x=>{
val splitted=x.split(",")
val id=splitted(0).trim.toInt
val dept=splitted(1).trim
val sal=splitted(2).trim.toInt
(id,(dept,sal))
})
payrollmap.foreach(println)
val output = payrollmap.filter(x=>x._2._2>2000)
(1,(dev,3500))
(2,(dev,3700))
(4,(qa,2800))
(3,(qa,3000))
val joined=empmap.join(payrollmap)
joined.foreach(println)
(1,((Vinayak,35),(dev,3500)))
(3,((Raju,30),(qa,3000)))
(5,((Shreshta,1),(prod,1000)))
(4,((Karthik,28),(qa,2800)))
(6,((Siddhish,2),(prod,2000)))
(2,((Nilesh,37),(dev,3700)))
age > 30 and sal > 2000
val output=joined.filter(x=>x._2._1._2>30 && x._2._2._2>2000)
Using Dataframe
id,name,age
1,Vinayak, 35
2,Nilesh, 37
3,Raju, 30
4,Karthik, 28
5,Shreshta,1
6,Siddhish, 2
val emp = sc.textFile("C:/Users/Redirection/pateln7/Downloads/SPARK/emp.txt")
val first = emp.first
val empfile = emp.filter(x=>x!=first)
val empmap=empfile.map(x=>{
val splitted=x.split(",")
val id=splitted(0).trim.toInt
val name=splitted(1).trim
val age=splitted(2).trim.toInt
(id,name,age)
})
empmap.foreach(println)
val empdf = empmap.toDF("id","name","age")
//payroll table
id,dept,sal
1,dev, 3500
2,dev, 3700
3,qa, 3000
4,qa, 2800
5,prod,1000
6,prod, 2000
val payroll = sc.textFile("C:/Users/Redirection/pateln7/Downloads/SPARK/payroll.txt")
val first = payroll.first
val payrollfile = payroll.filter(x=>x!=first)
val payrollmap=payrollfile.map(x=>{
val splitted=x.split(",")
val id=splitted(0).trim.toInt
val dept=splitted(1).trim
val sal=splitted(2).trim.toInt
(id,dept,sal)
})
payrollmap.foreach(println)
val payrolldf = payrollmap.toDF("id","dept","sal")
scala> val dfjoined = empdf.join(payrolldf, empdf("id")===payrolldf("id"))
dfjoined: org.apache.spark.sql.DataFrame = [id: int, name: string ... 4 more fields]
scala> dfjoined.show
+---+--------+---+---+----+----+
| id| name|age| id|dept| sal|
+---+--------+---+---+----+----+
| 1| Vinayak| 35| 1| dev|3500|
| 6|Siddhish| 2| 6|prod|2000|
| 3| Raju| 30| 3| qa|3000|
| 5|Shreshta| 1| 5|prod|1000|
| 4| Karthik| 28| 4| qa|2800|
| 2| Nilesh| 37| 2| dev|3700|
+---+--------+---+---+----+----+
Using sqlcontext
val dfjoined = empdf.join(payrolldf, empdf("id")===payrolldf("id"))
dfjoined.registerTempTable("joinedemp")
val sqlContext = dfjoined.sqlContext
val output = sqlContext.sql("select * from joinedemp where age>30 and sal> 3500").show