Druid.io: update/override existing data via streams from Kafka (Druid Kafka indexing service)
I'm loading streams from Kafka using the Druid Kafka indexing service.
But the data I uploaded is always changed, so I need to reload it again and avoid duplicates and collisions if data was already loaded.
I research docs about Updating Existing Data in Druid.
But all info about Hadoop Batch Ingestion, Lookups .
Is it possible to update existing Druid data during Kafka streams?
In other words, I need to rewrite the old values with new ones using Kafka indexing service (streams from Kafka).
May be any kind of setting to rewrite duplicates?
See also questions close to this topic
-
onProgressUpdate doesn't receives progress properly
Hi I am downloading multiple files one after the other in async task. And I am updating progress in notification.
Here is my small part of asyctask code
@Override protected String doInBackground(String... f_url) { ArrayList<Download> list = new ArrayList<>(); list.add(new Download("https://sample-videos.com/video/mp4/720/big_buck_bunny_720p_20mb.mp4","video1.mp4")); //more dummy data for (int i = 0; i < list.size(); i++) { Download download = list.get(i); int count; try { URL url = new URL(download.url); URLConnection conection = url.openConnection(); conection.connect(); int lenghtOfFile = conection.getContentLength(); InputStream input = new BufferedInputStream(url.openStream(), 8192); OutputStream output = new FileOutputStream(Environment.getExternalStorageDirectory() + "/" + download.name); byte data[] = new byte[1024]; long total = 0; while ((count = input.read(data)) != -1) { total += count; int percentage = (int)((total * 100) / lenghtOfFile); publishProgress(download.name, String.valueOf(i+1), String.valueOf(list.size()), String.valueOf(percentage)); output.write(data, 0, count); } output.flush(); output.close(); input.close(); } catch (Exception e) { } } return null; } @Override protected void onProgressUpdate(String... values) { super.onProgressUpdate(values); updateNotification(values[0], Integer.parseInt(values[1]), Integer.parseInt(values[2]), Integer.parseInt(values[3])); }
Lets come to my actual problem. When I debug at this line
int percentage = (int)((total * 100) / lenghtOfFile);
it shows me correct percentage. When I receive this values inonProgressUpdate
it shows me"0"
.Due to this I cannot update my notification.
-
Data parse from JSON Response
I can't parse data from Response:
here is my snippet:
Response = '{"sys":"[{\"division\":\"Barisal\",\"district\":\"Pirojpur Zila\",\"upazilla\":\"Mathbaria Upazila\"},{\"division\":\"Barisal\",\"district\":\"Jhalokati Zila\",\"upazilla\":\"Rajapur Upazila\"},{\"division\":\"Barisal\",\"district\":\"Barguna Zila\",\"upazilla\":\"Amtali Upazila\"},{\"division\":\"Barisal\",\"district\":\"Barisal Zila\",\"upazilla\":\"Banari Para Upazila\"},{\"division\":\"Barisal\",\"district\":\"Pirojpur Zila\",\"upazilla\":\"Pirojpur Sadar Upazila\"},{\"division\":\"Barisal\",\"district\":\"Barisal Zila\",\"upazilla\":\"Muladi Upazila\"}]"}'; JSONObject json = new JSONObject(response); JSONArray userdetails = json.getJSONArray("sys"); for (int i=0; i<userdetails.length(); i++) { JSONObject user = userdetails.getJSONObject(i); String division = user.getString("division"); String district = user.getString("district"); String upazilla = user.getString("upazilla"); }
I debug the code. Code stop when tried to check userdetails length.
Any ideas ?
-
Rewrite code from logcat output to TextView output
I would like to rewrite the following code to output in a TextView instead of the logcat. How do I do that?
public static void printUsageStats(List<UsageStats> usageStatsList,Context context){ for (UsageStats u : usageStatsList) { PackageManager pm = context.getPackageManager(); PackageInfo foregroundAppPackageInfo = null; try { foregroundAppPackageInfo = pm.getPackageInfo(u.getPackageName(), 0); } catch (PackageManager.NameNotFoundException e) { e.printStackTrace(); } if (u.getTotalTimeInForeground() > 0) { Log.d(TAG, "Pkg: " + foregroundAppPackageInfo.applicationInfo.loadLabel(pm).toString() + "\t" + String.valueOf(((u.getTotalTimeInForeground() / 60000) / 60) / 24) + "d " + String.valueOf(((u.getTotalTimeInForeground() / 60000) / 60) % 24) + "hr " + String.valueOf((u.getTotalTimeInForeground() / 60000) % 60) + "m " + String.valueOf((u.getTotalTimeInForeground() / 1000) % 60) + "s"); } } }
Thanks
-
Kafka State Store Processor Delete: how to use?
I've defined a class extending Processor and I'm using a KeyValueStore to store temporary store some messages before to send them to the sink topic. In particular, I receive on a source topic a set of fragmented messages and, once all are received, I've to assemble them and send, the concatenated message to the sink topic. In the
Processor:process()
method, once I've sent the message with the forward() I want to delete, by means of the delete(K key) method, the message from the state store. In the processor I've created the state store withStoreBuilder<KeyValueStore<byte[], String>> storeBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("message-store"), Serdes.ByteArray(), Serdes.String());
The problem is that the removal does not happen and when I send another message with the same key, I still receive in the value the previous messages.
Code:
kvStore = (KeyValueStore<byte[], String>) this.context.getStateStore("message-store");
kvStore.delete((byte[]key)
The
put
and theget
of the state store work properly.Is there anything wrong with this approach?
-
kafka streams not fetching all records
I have a java Kafka streams application that reads from a topic do some filtering and transformations and writing the data back to Kafka to a different topic. I print the stream object on every step. I noticed that if I send more than dozens of records to the input topic, some records are not consumed by my Kafka streams application.
when using kafka-console-consumer.sh to consume from the input topic, I do receive all records.
I'm running Kafka 1.0.0 with one broker and one partition topic.
Any idea why?
public static void main(String[] args) { final String bootstrapServers = System.getenv("KAFKA"); final String inputTopic = System.getenv("INPUT_TOPIC"); final String outputTopic = System.getenv("OUTPUT_TOPIC"); final String gatewayTopic = System.getenv("GATEWAY_TOPIC"); final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "PreProcess"); streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "PreProcess-client"); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 300L); final StreamsBuilder builder = new StreamsBuilder(); final KStream<String, String> textLines = builder.stream(inputTopic); textLines.print(); StreamsTransformation streamsTransformation = new StreamsTransformation(builder); KTable<String,Gateway> gatewayKTable = builder.table(gatewayTopic, Consumed.with(Serdes.String(), SerdesUtils.getGatewaySerde())); KStream<String, Message> gatewayIdMessageKStream = streamsTransformation.getStringMessageKStream(textLines,gatewayKTable); gatewayIdMessageKStream.print(); KStream<String, FlatSensor> keyFlatSensorKStream = streamsTransformation.transformToKeyFlatSensorKStream(gatewayIdMessageKStream); keyFlatSensorKStream.to(outputTopic, Produced.with(Serdes.String(), SerdesUtils.getFlatSensorSerde())); keyFlatSensorKStream.print(); KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration); streams.cleanUp(); streams.start(); // Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams Runtime.getRuntime().addShutdownHook(new Thread(() -> { streams.close(); })); }
-
kafka 010 + spark streaming problems import
i'm trying to integrate spark streaming spark version : 2.2.1 kafka version : kafka _2.10-0.10.0.1 Here's the code that sets the Kafka parms and creates the direct stream:
Map<String, Object> kafkaParams = new HashMap<>(); kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG,"group1"); kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest"); kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true); SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkKafka10WordCount"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(30)); final JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String,String>Subscribe(topics,kafkaParams));
Dependencies:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>2.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>2.2.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.0.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.2.1</version> </dependency>
-
What is the difference between data lake and data warehouse?
I know about the concept of data lake, but still can not get the points why we should use data lake in next generation data architecture?
-
How do I take advantage of my local resources using Spark in local mode?
I've been using Apache Spark to write a desktop app which lets you tamper data interactively. I've recently started reading "Learning Spark" and in that the author says that in
local
mode (when master is set tolocal
) Spark only uses oneThread
.How can I take advantage of all of the cores in my computer without having a full-blown Spark cluster on my computer?
I'm using Java / Kotlin.
-
Cleaning unnecessary variable in big data by using R
I have a data set which contains 163 columns(variable) and 199566 rows(data). So How can i eleminate redundant data ? Can i do this by using normal distribution?
-
How we ensure that
Import flat file into database table by using SQL server 2014 and now need to validate that all data of file is exist in tables or not. It is the term of test process when we need to validate that all record is successfully in tables and we have no column by which we ensure data and number or columns are equal in both side but number of rows are different SQL table have more data.
-
Generate 1 Excel-file with two tabs from two input file in Pentaho
I am trying to develop a job that is able to generate 1 Excel-file with two tabs. Basically, what I want to achieve is:
tab 1 is based on input file 1 tab 2 is based on input file 2
I have two input file which contain different query, but end result should be in one excel file with two tab ,one tab for first input and second for input 2.
-
How to make High Level Design Document ETL
I will be very much Thankful if someone can share their experience while designing ETL.
I am trying to prepare High Level Design Document. This is first time i am architecture the ETL Solution.
May some one please help me what are the things we must take care while designing.
The flat file comes from vendor every week, we need to load and generate report.
Thanks a lot
-
druid superset with hive integration
I am new to druid.i already have dataset/tables in hive.i want to fetch data from hive to druid superset.please,help me how to establish connection between hive and druid and how i can fetch data from hive to druid.
-
Connecting Druid with MySQL
I finish setting up Druid on my computer by reading quickstart guide. I do setup MySQL extension by reading MySQL Metadata Store as well.
But, when I sign to mysql to
mysql -u druid -p dirud
I got
druid
table only.I don't have any data that I imported with the following command via example.curl -X 'POST' -H 'Content-Type:application/json' -d @quickstart/wikiticker-index.json localhost:8090/druid/indexer/v1/task
I saw
Success
status on Druid consolehttp://localhost:8090/console.html
as well.The following is my
common.runtime.properties
for mysql setupdruid.metadata.storage.type=mysql druid.metadata.storage.connector.connectURI=jdbc:mysql://127.0.0.1:3306/druid druid.metadata.storage.connector.user=druid druid.metadata.storage.connector.password=diurd
Is something wrong with my setup?
-
what caused druid tasks failed
I had set up druid cluster(10 nodes),ingestion kafka data using indexing service.However,I found many of tasks are failed like below,but some data had been existed in segments,I am not sure if all datas are pushed in the segments. failed task lists
besides that,I choose some logs of failed tasks,found there are no fatal error messages,I posted the log file, please help me what caused the task failed.thank so much. one log of failed tasks
there are 2 questions I want to ask,one is how to confirm all consumer data are pushed in the segments,the other is what caused the task Failure.