Hive: create table partitioned by year and month of TIMESTAMP column
I was wondering whether it is possible to create the table partitioned by YEAR and MONTH of a TIMESTAMP column? For example, like the following command:
USE database; CREATE TABLE credit_transactions( processdate TIMESTAMP, requestprocess STRING, cardno_hash STRING, ) PARTITIONED BY (YEAR(processdate) INT, MONTH(processdate), INT) CLUSTERED BY (cardno_hash) into 50 buckets stored as orc TBLPROPERTIES("transactional"="true");
Then, is it possible to simply add data from a csv file and Hive automatically partitions the data? For example:
LOAD DATA LOCAL INPATH '/root/usr/transact.csv' OVERWRITE INTO TABLE credit_transact
See also questions close to this topic
File watch in S3 and send the particular path to a program
I am new with S3 bucket processing. I run my hive scripts running in ec2-insctance and its results in the form of .csv files gets saved in their respective folders according to the script in S3. Now my requirement is that I have to have a file watch to see whenever a new .csv file is overwritten in every folders in S3 and send the full path of those .csv to my python program and call the program to run and save the output.csv in the same folder.It would be helpful if anyone can suggest some ways so that I could pick up and implement it.
Hive: How to deal with files that comprise unfixed number of fields?
Dealing with a file on
HDFSthat comprises different num of fields separated by ','. For instance:
uid1, eid01, para1, para2, para3,para4,para5,timestamp uid1, eid12, para56, para57, timestamp uid3, eid42, para102,timestamp
The number of fields is not fixed.
Now I want to put these data into a
Hivetable that has 4 columns, and all fields of
'para..'in one colume like:
uid eid para datatime uid1 eid01 para1, para2, para3,para4,para5 timestamp uid1 eid12 para56, para57 timestamp uid3 eid42 para102 timestamp
The data amount is so large that I cannot deal with it using tools like
AWK. Is there any other solution?
Any help is appreciated.
Unable to query using file in Data Proc Hive Operator
I am unable to query with .sql file in
DataProcHiveOperator. Though the documentation tells that we can query using file. Link of the documentation Here
It is working fine when I give query directly Here is my sample code which is working fine with writing query directly:
HiveInsertingTable = DataProcHiveOperator(task_id='HiveInsertingTable', gcp_conn_id='google_cloud_default', query='CREATE TABLE TABLE_NAME(NAME STRING);', cluster_name='cluster-name', region='us-central1', dag=dag)
Querying with file :
HiveInsertingTable = DataProcHiveOperator(task_id='HiveInsertingTable', gcp_conn_id='google_cloud_default', query='gs://us-central1-bucket/data/sample_hql.sql', queri_uri="gs://us-central1-bucket/data/sample_hql.sql cluster_name='cluster-name', region='us-central1', dag=dag)
There is no error on sample_hql.sql script.
It is reading file location as a query and throwing me the error as:
Query: 'gs://bucketpath/filename.q' Error occuring - cannot recognize input near 'gs' ':' '/'
Similar issue has also been raised Here
How to efficiently unpivot MULTIPLE columns in Hive?
My data is structured like in the below table:
| Name | Foo_A | Foo_B | Foo_C | Bar_A | Bar_B | Bar_C | -------------------------------------------------------- | abcd | 16 | 32 | 14 | 52 | 41 | 17 | | ... | ... | ... | ... | ... | ... | ... |
I am looking to query the data in Hive in a way such that it looks like this:
| Name | Class | FooVal | BarVal | ---------------------------------- | abcd | A | 16 | 52 | | abcd | B | 32 | 41 | | abcd | C | 14 | 17 | | ... | ... | ... | ... |
I am already aware of and am using a UNION ALL, but what would be a more efficient way of doing this using "LATERAL VIEW explode" a map data type?
Is OR gives better performance than IN for HIVE queries
I have following two queries in Hive to get some specific result.
select * from table1 where col1 IN (a, b, c) select * from table1 where col1=a OR col1=b OR col1=c
As per my understanding IN will be converted internally to sequence of ORs. Executed locally in spark-sql but did not find any sort of performance difference(like execution timing,filtered data scanning etc). So what difference we can see in IN and OR based on the functionality. Any help will be appreciated.
Calculate residual amount in dataframe column
I have a "capacity" dataframe:
scala> sql("create table capacity (id String, capacity Int)"); scala> sql("insert into capacity values ('A', 50), ('B', 100)"); scala> sql("select * from capacity").show(false) +---+--------+ |id |capacity| +---+--------+ |A |50 | |B |100 | +---+--------+
I have another "used" dataframe with following information:
scala> sql ("create table used (id String, capacityId String, used Int)"); scala> sql ("insert into used values ('item1', 'A', 10), ('item2', 'A', 20), ('item3', 'A', 10), ('item4', 'B', 30), ('item5', 'B', 40), ('item6', 'B', 40)") scala> sql("select * from used order by capacityId").show(false) +-----+----------+----+ |id |capacityId|used| +-----+----------+----+ |item1|A |10 | |item3|A |10 | |item2|A |20 | |item6|B |40 | |item4|B |30 | |item5|B |40 | +-----+----------+----+
Column "capacityId" of the "used" dataframe is foreign key to column "id" of the "capacity" dataframe. I want to calculate the "capacityLeft" column which is residual amount at that point of time.
+-----+----------+----+--------------+ |id |capacityId|used| capacityLeft | +-----+----------+----+--------------+ |item1|A |10 |40 | <- 50(capacity of 'A')-10 |item3|A |10 |30 | <- 40-10 |item2|A |20 |10 | <- 30-20 |item6|B |40 |60 | <- 100(capacity of 'B')-40 |item4|B |30 |30 | <- 60-30 |item5|B |40 |-10 | <- 30-40 +-----+----------+----+--------------+
In real senario, the "createdDate" column is used for ordering of "used" dataframe column.
Spark version: 2.2
Spark Clustered By/Bucket by dataset not using memory
I recently came across Spark bucketby/clusteredby here.
I tried to mimic this for a 1.1TB source file from S3 (already in parquet). Plan is to completely avoid shuffle as most of the datasets are always joined on "id" column. Here are is what I am doing:
myDf.repartition(20) .write.partitionBy("day") .option("mode", "DROPMALFORMED") .option("compression", "snappy") .option("path","s3://my-bucket/folder/1year_data_bucketed/").mode("overwrite") .format("parquet").bucketBy(20,"id").sortBy("id").saveAsTable("myTable1YearBucketed")
On a different EMR cluster, I create a table and access it.
CREATE TABLE newtable_on_diff_cluster (id string, day date, col1 double, col2 double) USING PARQUET OPTIONS ( path "s3://my-bucket/folder/1year_data_bucketed/" ) CLUSTERED BY (id) INTO 20 BUCKETS
Create a scala dataframe and join it with another table of same 20 buckets of id column.
val myTableBucketedDf = spark.table("newtable_on_diff_cluster") val myDimTableBucketedDf = spark.table("another_table_with_same_bucketing") val joinedOutput = myTableBucketedDf.join(myDimTableBucketedDf, "id") joinedOutput.show()
Here are my questions:
- I see that even with repartition, shuffle is still removed in the explain plan, which is good. Is there any issue with using repartition, partition, bucketBy in the above fashion?
- The above join is not looking like it is using memory on my EMR cluster from Ganglia. When joining Regular files in parquet format without bucketing, they seem to be running faster in memory for smaller number of day partitions. I havent tested it for more days. How exactly is the join processed here? Is there anyway to avoid the CREATE TABLE sql statement and instead use parquet metadata to define the table schema using scala? I dont want to repeat the column names, data types when they are actually available in parquet.
- What is the ideal number of buckets or individual file size after bucket by in terms of available memory on the executor? If the unique number of values in ID column is in ~100 MM range, then if I understand correctly, 20 buckets will divide each bucket as 5MM unique IDs. I understand that the sortBy in here is not respected due to multiple files being produced by Spark for BucketBy. What is the recommendation for repartition/end file sizes/number of buckets in this case.
How do you add partitions to a partitioned table in Apache Presto running in Amazon EMR?
I'm running Apache Presto 0.212 in EMR 5.19.0, because AWS Athena doesn't support the user defined functions that Presto supports. I'm using EMR configured to use the glue schema. I have pre-existing Parquet files that already exist in the correct partitioned format in S3.
It appears that recent Presto versions have removed the ability to create and view partitions. This raises the question: How do you add individual partitions? I can use the Athena console in AWS and run
MSCK REPAIR mytable;and that creates the partitions correctly, which I can then query successfully using the Presto CLI or HUE. However, How do I do this in Presto?
If I try this in
presto-clion the EMR master node:
use hive.default; INSERT INTO "mytable$partitions" VALUES (2018, 9, 20)
I get an error saying
java.sql.SQLException: Query failed (#20181113_172115_00004_yywie): com.facebook.presto.connector.system.SystemTableHandle cannot be cast to com.facebook.presto.hive.HiveTableHandle
(Note that I'm using the database
defaultin Glue to store the schema. That's where "default" comes from.)
The old ways of doing this in Presto have all been removed relatively recently (
alter table mytable add partition (p1=value, p2=value, p3=value)or
INSERT INTO TABLE mytable PARTITION (p1=value, p2=value, p3=value), for example), although still found in the tests it appears. They don't work. If I try to execute such queries in HUE or in the Presto CLI, I get errors.
However, in the Presto CLI I can view the partitions that exist, entering this query on the EMR master node:
use hive.default; select * from "mytable$partitions";
Initially that query result is empty, because no partitions exist, of course. If I manually run MSCK REPAIR in Athena to create the partitions, then that query will show me all the partitions that have been created.
If I try using the HIVE CLI on the EMR master node, it doesn't work.
use default; ALTER TABLE mytable ADD PARTITION (p1=2018, p2=9, p3=20) location 's3://bucketname/rootfolder/p1=2018/p2=9/p3=20/'; FAILED: SemanticException [Error 10001]: Table not found mytable
So... how, using the Presto-CLI, or using HUE, or even using the Hive CLI, can I add partitions to a partitioned table stored in S3? Now that Presto has removed the ability to do this, what is the way it is supposed to be done? Trying to follow earlier examples such as this one doesn't work.
While "MSCK REPAIR"works, it's an expensive way of doing this and causes a full S3 scan. I would prefer to add partitions individually rather than scan the entire S3 bucket to find existing partitions, especially when adding one new partition to a large table that already exists.
I also note this quote at page Using the AWS Glue Data Catalog as the Metastore for Hive:
We recommend creating tables using applications through Amazon EMR rather than creating them directly using AWS Glue. Creating a table through AWS Glue may cause required fields to be missing and cause query exceptions.
There must be a way of doing this within EMR. What is it?
Hadoop Sorting: Custom Sort and Sort Optimization
Here is my understanding of the way sorting works in Hadoop. Based on that I have a few questions. I have gone over Why mapreduce secondary sorting is not on composite key's compareTo()? but I am basically questioning the Hadoop Definitive Guide. As per my understanding, It basically follows the following order of sorting:
Custom sorting is looked for which is set using job.setSortComparator() class. Is this understanding right?
1.1 The job.setSortComparator() method takes a RawComparator implementation which has only the "raw" compare(byte b1, int s1, int l1, byte b2, int s2, int l2) method
1.2 This means that the class needed to implement custom sort needs to implement the compare(byte b1, int s1, int l1, byte b2, int s2, int l2) method.
1.3 This also means that it will be this compare(byte b1, int s1, int l1, byte b2, int s2, int l2) method that will ultimately be called for my custom sort.
1.4 Now, if I create my custom class MyWritableComparator that implements the RawComparator interface, and implement the compare(byte b1, int s1, int l1, byte b2, int s2, int l2) method, then it is as good as Optimized Sorting from point 2.1.1 as this is also a byte level comparison. Is my understanding right?
1.5 If I instead have my custom class MyWritableComparator extend the WritableComparator (which already by default implements the compare(byte b1, int s1, int l1, byte b2, int s2, int l2) method and overrides the compare(WritableComparable a, WritableComparable b) method), and then have my custom class override the compare(WritableComparable a, WritableComparable b) method, then during sorting: my MyWritableComparator instance will be used ===> the default implementation of the compare(byte b1, int s1, int l1, byte b2, int s2, int l2) method from WritableComparator will be called which deserializes them into WritableComparable objects ===> and then calls the compare(WritableComparable a, WritableComparable b) that MyWritableComparator instance has overridden ===> and this is how Custom Sort is being achieved
If Custom Sorting Order from above is not set, Optimized sorting is looked for which is set using WritableComparator.define() 2.1 WritableComparator.define() takes a WritableComparator which has 2 crucial methods: 2.1.1 compare(byte b1, int s1, int l1, byte b2, int s2, int l2) which according to the documentation is the hook for optimization because of byte level comparisons 2.1.2 compare(WritableComparable a, WritableComparable b) - which if overridden is as good as Custom Sort Order. If this is true then how is it different from Job.setSortComparator() which ideally is used for Custom Sorting as mentioned in point 1?
- If Optimized Sorting is also not set from the above method, then the Default Natural Sorting Order is looked for which uses WritableComparator, the default implementation
as far as sort optimization is concerned, points 1.4 and 2.1.1 both achieve the same thing but Hadoop Definitive Guide suggests only point 2.1.1
as far as custom sort is concerned, 1.5 and 2.1.2 achieve the same thing but Hadoop Definitive Guide suggests only point 1.5
Also, why does job.setSortComparator need a RawComparator implementation. Sorting is required only on keys in MR right? And keys have to be WritableComparable and therefore the job.setSortComparator method should have instead taken a WritableComparator implementation. Any sort needed on "values" in MR is anyway handled by the developer himself as there is no API for sorting values unlike keys I believe. Is my understanding right?
Any insights would be very helpful.