Save Spark dataframe as dynamic partitioned table in Hive
Save Spark dataframe as dynamic partitioned table in Hive
I have a sample application working to read from csv files into a dataframe. The dataframe can be stored to a Hive table in parquet format using the method df.saveAsTable(tablename,mode)
.
df.saveAsTable(tablename,mode)
The above code works fine, but I have so much data for each day that i want to dynamic partition the hive table based on the creationdate(column in the table).
is there any way to dynamic partition the dataframe and store it to hive warehouse. Want to refrain from Hard-coding the insert statement using hivesqlcontext.sql(insert into table partittioin by(date)....)
.
hivesqlcontext.sql(insert into table partittioin by(date)....)
Question can be considered as an extension to :How to save DataFrame directly to Hive?
any help is much appreciated.
4 Answers
4
I believe it works something like this:
df
is a dataframe with year, month and other columns
df
df.write.partitionBy('year', 'month').saveAsTable(...)
or
df.write.partitionBy('year', 'month').insertInto(...)
Ok, so was able to work it out with 1.4 version. df.write().mode(SaveMode.Append).partitionBy("date").saveAsTable("Tablename"); . This however changes my date field to integer value and remove the actual date. e.g. there are 9 unique dates in the column but they are now stored as 1,2,3.... and folder name is date=1,2,3,... instead of date=20141121. Let me know if there is a way to do this.
– Chetandalal
Jul 16 '15 at 18:21
@subramaniam-ramasubramanian: pls reply to OP s question as answer instead of editing existing answer
– Ram Ghadiyaram
May 1 '17 at 10:59
I was able to write to partitioned hive table using df.write().mode(SaveMode.Append).partitionBy("colname").saveAsTable("Table")
df.write().mode(SaveMode.Append).partitionBy("colname").saveAsTable("Table")
I had to enable the following properties to make it work.
Where should i set the above 2 parameters ? I tried logging in hive shell and run above commands, it failed. i am sure i am doing it wrong. Could you please tell where can i set these properties ?
– Vrushank Doshi
Jun 23 '16 at 20:58
@VrushankDoshi You would set it in the spark program, right after you create your hiveContext. val sparkConf = new SparkConf() val sc = new SparkContext(sparkConf) val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) hiveContext.setConf("hive.exec.dynamic.partition","true") hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
– MV23
Jun 29 '16 at 20:16
from my side this code overwrites but do not appends any data. why?
– enneppi
Mar 20 at 17:08
I also faced same thing but using following tricks I resolved.
When we Do any table as partitioned then partitioned column become case sensitive.
Partitioned column should be present in DataFrame with same name (case sensitive). Code:
var dbName="your database name"
var finaltable="your table name"
// First check if table is available or not..
if (sparkSession.sql("show tables in " + dbName).filter("tableName='" +finaltable + "'").collect().length == 0) {
//If table is not available then it will create for you..
println("Table Not Present n Creating table " + finaltable)
sparkSession.sql("use Database_Name")
sparkSession.sql("SET hive.exec.dynamic.partition = true")
sparkSession.sql("SET hive.exec.dynamic.partition.mode = nonstrict ")
sparkSession.sql("SET hive.exec.max.dynamic.partitions.pernode = 400")
sparkSession.sql("create table " + dbName +"." + finaltable + "(EMP_ID string,EMP_Name string,EMP_Address string,EMP_Salary bigint) PARTITIONED BY (EMP_DEP STRING)")
//Table is created now insert the DataFrame in append Mode
df.write.mode(SaveMode.Append).insertInto(empDB + "." + finaltable)
}
df.write.mode(SaveMode.Append).insertInto(empDB + "." + finaltable) dont you need to mention partitionBy ? example df.write.mode(SaveMode.Append).partitionBy("EMP_DEP" ).insertInto(empDB + "." + finaltable)
– sri hari kali charan Tummala
Jan 31 at 11:43
No need.. its optional
– Nilesh Shinde
Jan 31 at 14:47
haven't worked for me , table count is Zero
– sri hari kali charan Tummala
Jan 31 at 17:01
my tables are existing tables in hive
– sri hari kali charan Tummala
Jan 31 at 17:01
This is what works for me. I set these settings and then put the data in partitioned tables.
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
sqlContext.setConf("hive.exec.dynamic.partition", "true")
sqlContext.setConf("hive.exec.dynamic.partition.mode",
"nonstrict")
By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.
Tried this Partitionby method. It only works on RDD level, once dataframe is created most of the methods are DBMS styled e.g. groupby, orderby but they don't serve the purpose of writing in different partitions folders on Hive.
– Chetandalal
Jul 13 '15 at 13:01