Read XML with PySpark
I'm trying to read XML with PySpark, but I have some problems.
I'm running this code:
df_cli = sqlContext.read.format('com.databricks.spark.xml').options(rowTag='Cli').load('MyFile.txt', schema = schema_xml)
and MyFile.txt:
<Cli Tp="1" Cd="8338" Autorzc="S">
<Op Contrt="1" NatuOp="01" Mod="1304">
<Venc v110=" 123" v120=" 123"/>
</Op>
<Op Contrt="2" NatuOp="01" Mod="1304">
<Venc v110=" 123" v120=" 123"/>
</Op>
</Cli>
<Cli Tp="2" Cd="8568" Autorzc="N">
<Op Contrt="3" NatuOp="01" Mod="1304">
<Venc v110=" 123" v120=" 123"/>
</Op>
<Op Contrt="4" NatuOp="01" Mod="1304">
<Venc v110=" 123" v120=" 123"/>
</Op>
</Cli>
Schema:
schema_xml = StructType([
StructField("@Autorzc", StringType(), True),
StructField("@Cd", StringType(), True),
StructField("@Tp", StringType(), True),
StructField("Op",
StructType([
StructField("@Contrt", StringType(), True),
StructField("@Mod", StringType(), True),
StructField("@NatuOp", StringType(), True),
StructField("Venc",
StructType([
StructField("@v110", StringType(), True),
StructField("@v120", StringType(), True)
])
)
])
)
])
When I run this code, my output is that:
Row(@Autorzc=u'S', @Cd=u'8338', @Tp=u'1', Op=Row(@Contrt=u'2', @Mod=u'1304', @NatuOp=u'01', Venc=Row(@v110=u' 123', @v120=u' 123)))
Row(@Autorzc=u'N', @Cd=u'8568', @Tp=u'2', Op=Row(@Contrt=u'4', @Mod=u'0202', @NatuOp=u'01', Venc=Row(@v110=u' 123', @v120=u' 123)))
The result is getting only last 'Op' in each 'Cli', but I expected to get all 'Op' inside each 'Cli' in a list, like this:
Row(@Autorzc=u'S', @Cd=u'8338', @Tp=u'1', Op=[Row(@Contrt=u'1', @Mod=u'1304', @NatuOp=u'01', Venc=Row(@v110=u' 123', @v120=u' 123)), Row(@Contrt=u'2', @Mod=u'1304', @NatuOp=u'01', Venc=Row(@v110=u' 123', @v120=u' 123))])
Row(@Autorzc=u'N', @Cd=u'8568', @Tp=u'2', Op=[Row(@Contrt=u'3', @Mod=u'1304', @NatuOp=u'01', Venc=Row(@v110=u' 123', @v120=u' 123)), Row(@Contrt=u'4', @Mod=u'1304', @NatuOp=u'01', Venc=Row(@v110=u' 123', @v120=u' 123))])
Obs: In my original file I can have 1, 2, 3...N 'Op' and each 'Op' has only 1 'Venc'.
I'm using this documentation.
See also questions close to this topic
-
Why is my if statement is always triggering in python
I'm new to Python and I decided to do a project by myself. In my project there is a if statement that always triggers. Also I'm still learning PEP 8, so tell me if I violated it.
yn = input('Do you need me to explain the rules. Y/N: ').lower() if yn == 'y' or 'yes': print('I will think of a number between 1 - 100.') print('You will guess a number and I will tell you if it is higher or lower than my number.') print('This repeats until you guess my number.')
-
Dialogflow python client versioning
I am using python client for accessing dialogflow's functionality.
My question is: doesimport dialogflow
and
import dialogflow_v2 as dialogflow
have any difference?
As per my experience, all the methods are the same. In the samples given by Google,import dialogflow_v2 as dialogflow
has been used and I could not see any difference between the two.Please note that here I am talking about version v2 in python client, and not the dialogflow API version.
-
Which device does .in_waiting and .out_waiting refer to in Pyserial?
I have a computer that is connected to a serial device at
/dev/ttyUSB0
via a wire with USB2 and microUSB2 connectors.My script writes:
ser = serial.Serial('/dev/ttyUSB0') in_buffer = ser.in_waiting in_data = ser.read( in_buffer ) out_buffer = ser.out_waiting out_data = ser.read( out_buffer )
Output:
ser = {'is_open': True, 'portstr': '/dev/ttyUSB0', 'name': '/dev/ttyUSB0', '_port': '/dev/ttyUSB0', '_baudrate': 9600, '_bytesize': 8, '_parity': 'N', '_stopbits': 1, '_timeout': None, '_write_timeout': None, '_xonxoff': False, '_rtscts': False, '_dsrdtr': False, '_inter_byte_timeout': None, '_rs485_mode': None, '_rts_state': True, '_dtr_state': True, '_break_state': False, '_exclusive': None, 'fd': 6, 'pipe_abort_read_r': 7, 'pipe_abort_read_w': 8, 'pipe_abort_write_r': 9, 'pipe_abort_write_w': 10} in_buffer = 0 <class 'int'> in_data = b'' <class 'bytes'> out_buffer = 0 <class 'int'> out_data = b'' <class 'bytes'>
Does
in_buffer
andout_buffer
refer to the no. of bytes in the buffer in the UART chip of the computer and the device/dev/ttyUSB0
, respectively? Why do they have zero byte size? -
Manipulate XML without destroying the formatting
My task is to add two tags to an existing XML file (or update their values, if they already exist).
At the moment, I use functions from
javax.xml
to read the file to aDocument
, manipulate it and write it back. This creates a nasty problem with Windows/Unix line endings (how to prevent XML Transformer from changing line endings) and also destroys all the previous formatting that the writers of the XML applied.My goal would be to be minimally invasive, i.e. keep the formatting and comments of the XML as much as possible while adding the two tags.
How can this be done with Java?
-
How do I load XML child nodes into MySQL table?
I am using MYSQL to run a webshop, and would like to automate the currency conversion.
I have an XML file that contains all the currencies I need and more. For each currency there are several child noes that contain the actual exchange rates.
When I use this code:
USE Valuta; TRUNCATE TABLE Valuta.Valutakurser; LOAD XML INFILE '/data/valutakurser.xml' INTO TABLE Valutakurser ROWS IDENTIFIED BY '<valutakurs>';
The XML:
<?xml version="1.0" encoding="ISO-8859-1" ?> <valuta> <overskrift>Valutaliste</overskrift> <oppdatert>23.01.2019 09:00</oppdatert> <timestamp>2019-01-23-09.10.58.777452</timestamp> <valutakurs> <land>USA</land> <isokode>US</isokode> <kode>USD</kode> <enhet>1</enhet> <navn>Dollar</navn> <overforsel> <kjop>8.5552</kjop> <salg>8.6202</salg> <endring>-0.62</endring> <forrige>8.5939</forrige> <midtkurs>8.5877</midtkurs> </overforsel> <seddel> <kjop>8.2285</kjop> <salg>8.9569</salg> </seddel> </valutakurs> <valutakurs> <land>Europeiske Union</land>
The result:
# land, isokode, kode, enhet, navn, overforsel/kjop, overforselsalg, overforselendring, overforselforrige, overforselmidtkurs, seddelkjop, seddelsalg 'USA', 'US', 'USD', '1', 'Dollar', NULL, NULL, NULL, NULL, NULL, NULL, NULL 'Europeiske Union', 'EU', 'EUR', '1', 'Euro', NULL, NULL, NULL, NULL, NULL, NULL, NULL 'Sverige', 'SE', 'SEK', '100', 'Svenske Kroner', NULL, NULL, NULL, NULL, NULL, NULL, NULL 'Danmark', 'DK', 'DKK', '100', 'Danske Kroner', NULL, NULL, NULL, NULL, NULL, NULL, NULL 'Storbritannia', 'GB', 'GBP', '1', 'Britiske Pund', NULL, NULL, NULL, NULL, NULL, NULL, NULL
The child objects are not imported. I can't find any way this can be linked to a column in a table as this is a multidimensional array. What I would like is to have all the child nodes imported as if they were not child nodes. I have googled and searched but I can't seem to find a single word on how this is done. I can't understand the MySQL docs, can any of you assist?
-
Load XML file, replace all "X" with "Y" and carry on using file
This is the code:
$xml = simplexml_load_file($candidateFile); if (!$xml) { $this->addCritical("ERROR: " . basename($candidateFile)); return; } $ModifiedXml = simplexml_load_string( str_replace( "\n</DESIGNATION_PLU>", '</DESIGNATION_PLU>', $xml ) );
The last line does not work, all that I need is that it will use loaded XML "$xml" replace some of the things and carry on working by using modified XML.
If I'm not clear, or you will need more information, just let me know.
Bigger section of code:
$xml = simplexml_load_file($candidateFile); if (!$xml) { $this->addCritical("ERROR: " . basename($candidateFile)); return; } // Before it will write to the new files I need to fix original XML $candidateFilenameParts = explode('_', basename($candidateFile)); $candidateFileDatetime = str_replace('.xml', '', $candidateFilenameParts[3]); $headerFilename = "header_$candidateFileDatetime.csv"; $productFilename = "product_$candidateFileDatetime.csv"; $paymentFilename = "payment_$candidateFileDatetime.csv"; $this->addInfo("");
XML: It looks like initially, values in XML had endings like "\n"
<LIGNE> <ID_TICKET>123</ID_TICKET> <CODE_RAYON>123</CODE_RAYON> <CODE_VENDEUR>123</CODE_VENDEUR> <PLU>123</PLU> <DESIGNATION_PLU>New 1234 ---> Next line </DESIGNATION_PLU> <CODE_EAN/> <TYPE_ARTICLE>Pièce</TYPE_ARTICLE> <QUANTITE_VENDUE>1</QUANTITE_VENDUE> <UNITE_MESURE>123</UNITE_MESURE> <PRIX_UNITAIRE_TTC>123</PRIX_UNITAIRE_TTC> <MONTANT_VENTE_TTC>123</MONTANT_VENTE_TTC> <SYMB_MONNAIE/> <ANNULE>0</ANNULE> <CODE_TVA>123</CODE_TVA> <TVA_TAUX>123</TVA_TAUX> <TYPE_TICKET>123</TYPE_TICKET> <POURC_REMISE/> <MONTANT_REMISE/> </LIGNE>
-
What is fastest way to random sample hive table in spark?
I have 5 large tables in hive. I want to create smaller samples for each of them using pyspark.
I found that there're at least 3 ways to do this.
# best option? df.rdd.takeSample(withReplacement=False, num=5000).toDF(schema=df.schema) # does not shuffles df.limit(5000) # fraction instead of n_rows df.sample(withReplacement=False, fraction=0.01)
Latest option requires fraction instead of specific number of rows so it depends on size of initial dataframe. df.count() takes long time.
I need a random sample and
limit
does not shuffle (?) dataframe.So I've chosen
df.rdd.takeSample
. Is it the best way to do fast random sample of dataframe? -
Wait for spark server execution if tests
I am trying to write simple test for my app. This test required another server (around spark).
I run this server via:
public MyServer(int port, Route route) { this.port = port; this.route = route; this.executorService = Executors.newSingleThreadExecutor(); this.executorService.submit(this); }
And run method:
Spark.port(this.port); Spark.post("/myServer", "application/json", route);
My custom implementation of spark.Route are very simple and looks like this:
public class MyRoute implements Route { @Override public Object handle(Request request, Response response) throws Exception { return "some response"; }
}
The problem i facing with is when i run my test like:
MyServer myServer = new MyServer("1111", new MyRoute());
My test ends right after:
Spark.post("/myServer", "application/json", route);
String. All waht i need is that server continue working untill i stop server.
I tryed to set thread option daemon to true, but that do nothing.
How can i start spark server and be sure, that it will work untill i call
Spark.stop();
-
pyspark: Unable to change the format of timestamp with granularity up to microseconds using pyspark.sql.functions
Say I have a dataframe as below (real dataframe is having date column as one of the 20 columns and having a good volume of data.For keeping the problem to point , i am here considering only dataframe with date column and a product column with a single row of date in it)
df = spark.createDataFrame([('12/21/2015 23:21:20.689523',"product1")], ['dt',"product_name"])
Now my objective is to change the timestamp format to "yyyy/MM/dd HH:mm:ss.SSSSSS" and save as a timestamp column in same dataframe(not a string column) So in that case I am facing following issues.
date_format function is not directly recogonizing the format and returning null
df.withColumn("new_datetime",date_format("dt","yyyy/MM/dd HH:mm:ss.SSSSSS" ))
Now to overcome this I have tried with to_timestamp and use date_format over that but granularity up to seconds only can be achieved here using to_timestamp .Microseconds in 'dt' column can not be included since it can not recognize. (if I am including '.SSSSSS' also , it will return null)
df.withColumn("new_datetime",to_timestamp("dt","MM/dd/yyyy HH:mm:ss"))
Now as a another option , I have tried to cast the column to "timestamp" , but it also end in vain
At last I have achieved it through datetime module of python
df = df.withColumn('row_index', monotonically_increasing_id()) def convert_fmt(date_time): return datetime.strptime(datetime.strptime(date_time,"%m/%d/%Y %H:%M:%S.%f").strftime("%Y/%m/%d %H:%M:%S.%f"),"%Y/%m/%d %H:%M:%S.%f") df_new=df.rdd.map(lambda x : (x.asDict()["row_index"],convert_fmt(x.asDict()["dt"]))).toDF(["index","dt_new"]) df=df_new.join(df,df_new.index==df.row_index).drop("row_index","index","dt")
But this is an indirect approach.I would like to know, how can we change the timestamp format of a date column with granularity up to microseconds and save it back to dataframe as a timestamp column using core-spark APIs and functions.
-
writing results of sql query to dataframe with scala fails in databricks
just running this spark-sql query in databricks works fine:
%sql select CONCAT(`tsArr[1]`,"-", `tsArr[0]`,"-", `tsArr[2]`," ", `tsArr[3]`) as time, cast (context._function as string) as funct, cast (context._param as string) as param, cast(context._value as string) as value from clickstreamDF lateral view explode(Context) as context
this outputs:
time funct param value 11-27-2017 08:20:33 Open location 3424 11-27-2017 08:20:33 Open Company Id testinc 11-27-2017 08:20:33 Open Channel Info 1 11-27-2017 08:20:33 Open UserAgent jack 11-27-2017 08:20:33 Open Language english
but when I want to put the query result in a dataframe like this
%scala val df_header = spark.sql(s"select CONCAT(`tsArr[1]`,"-", `tsArr[0]`,"-", `tsArr[2]`," ", `tsArr[3]`) as time, cast (context._function as string) as funct, cast (context._param as string) as param, cast(context._value as string) as value from clickstreamDF lateral view explode(Context) as context") df_header.createOrReplaceTempView("clickstreamDF")
then it fails. It says:
error: ')' expected but string literal found.
I'm guessing it has to do with the "-" and " ". I have tried replacing or extending with '' and `` or leaving "" away completely but without result. what am I doing wrong?
regards,
D.
-
How to split a column with scala based on one character and a space
I want to pull apart a column "_time" that contains a datetime (as fieldtype = string)
the dates inside the "_time" column look like this: 27-11-2017 08:20:33 So I thought this could work:
df.withColumn("col1", split(col("_time"), "\\-").getItem(0)) .withColumn("col2", split(col("_time"), "\\-").getItem(1)) .withColumn("col3", split(col("_time"), "\\-").getItem(2)) .withColumn("col4", split(col("_time"), "\\' '").getItem(3)) .show()
but this results in:
-------------------+----+----+-------------+----+ _time|col1|col2| col3|col4| -------------------+----+----+-------------+----+ 27-11-2017 08:20:33| 27| 11|2017 08:20:33|null| 27-11-2017 08:20:35| 27| 11|2017 08:20:35|null| 27-11-2017 08:20:35| 27| 11|2017 08:20:35|null|
so, it seems that it doesn't do anything with the space. I have tried using this:
.withColumn("col4", split(col("_time"), "\\' '")
but that also doesn't work. (gives same result)
any suggestions?
Regards
-
Spark Streaming Error : java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI:
Im trying to Stream data from kafka avro topic.
Below is my code snippet:
val sparkStreamingContext = new StreamingContext(sc, Durations.seconds(60)) val brokers = "Broker info" val schemaRegistryURL = "URL schema registry " val subjectValueName = "topicname" + "-value" val restService = new RestService(schemaRegistryURL) val valueRestResponseSchema = restService.getLatestVersion(subjectValueName) val parser = new Schema.Parser val topicValueAvroSchema: Schema = parser.parse(valueRestResponseSchema.getSchema) val kafkaParam = new mutable.HashMap[String, String]() kafkaParam.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) kafkaParam.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") kafkaParam.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") kafkaParam.put(ConsumerConfig.GROUP_ID_CONFIG, "streaming-kafka") kafkaParam.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") kafkaParam.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") val topicList = List("topicname") val messageStream = KafkaUtils.createDirectStream(sparkStreamingContext, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicList, kafkaParam)) val TRANSACTION_SCHEMA: StructType = SchemaConverters.toSqlType(topicValueAvroSchema).dataType.asInstanceOf[StructType] messageStream.foreachRDD { rdd => val streamData = spark.read.schema(TRANSACTION_SCHEMA).avro(rdd.map(x => x.value()).toString()) streamData.repartition(1).write.format("com.databricks.spark.avro").mode("Append") saveAsTable ("tablename") } } sparkStreamingContext.start() sparkStreamingContext.awaitTermination()
But I get below error can some one help to fix this.
java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: MapPartitionsRDD[75] at map at <console>:54