-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathSpark_Scala.txt
89 lines (63 loc) · 2.17 KB
/
Spark_Scala.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
Spark Scala Dataframe
# Reading a CSV File
val df = spark.read.option("header", "true").option("inferSchema", "true").csv("eg1.csv")
val df = spark.read.csv("eg1.csv")
For options refer to the following link -
https://spark.apache.org/docs/2.1.1/api/scala/index.html#org.apache.spark.sql.streaming.DataStreamReader
# Printing Top 5 rows of the dataframe
df.head(5)
# Printing in a Nice format Top 5 Rows of Dataframe
for (row <- df.head(5)){
println(row)
}
# Listing out all the columns
df.columns
# Statistical info about the numeric columns
df.describe().show()
# Selecting a column
df.select("A").show()
# Selecting multiple columns
df.select("A", "B").show()
df.select($"A", $"B").show()
# Create new column
df1 = df.withColumn("C", df("A")+df("B"))
# Datatypes and null values
df.printSchema()
# Rename a column
df2("C").as("3rdCol")
df2.select(df2("C").as("3rdCol")).show()
# Filtering data
- You need to import the following for scala filtering
import scala.implicits._
df.filter($"Close">480).show()
df.filter($"Close">480 && $"High"<500).show()
SQL way of doing it in spark
df.filter("Close>480").show()
df.filter("Close >480 and High<500").show()
Saving a filtered result
val ch1 = df.filter($"Close">480 && $"High"<500).collect()
Counting numberof entries which fit into the filtered query
df.filter($"Close">480 && $"High"<500).count()
Exact value match
df.filter($"Close"===480).show()
Correleation
df.select(corr("High","Low")).show()
For more such functions use the link below
https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.functions$
# Grouping Data
df.groupBy("Name").mean().show()
# Aggregate Functions
df.select(sum("Sales")).show()
# Order By
df.orderBy($"Sales".desc).show()
#Converting String to DateTime object
df.select(to_date($"DoB", "dd-MM-yyyy").alias("date"))
df = df.withColumn("Date", to_date($"DoB", "dd-MM-yyyy"))
# Extracting Year from a given DateTime column
df = df.withColumn("Agg_id2", year($"Date"))
# Creating Aggregate Functions
df = df.withColumn("Agg_id_fin",$"Agg_id2"*100000 + $"Agg_id1")
# Converting DataType of Columns
df = df.withColumn("Date1", df("Date").cast("String"))
# Deleting a Column
df = df.drop("Date1")