Spark Streaming job fails with ReceiverDisconnectedException Class
I've Spark Streaming job which captures the near real time data from Azure Eventhub and runs 24/7. More interestingly, my job fails at least 2 times a day with the below error. if I google the error, Microsoft docs gives me 'This exception is thrown if two or more PartitionReceiver instances connect to the same partition with different epoch values'. I'm not worried about data loss because spark Checkpointing will automatically take care of data when i restart the job, but my question is why the spark streaming job fails 2-3 times a day with the same error.
Has anybody faced the same issue, is there any solution/workaround available of this. Any help would be much appreciated.
See also questions close to this topic
-
How to avoid nested map calls in Spark?
I have a list of transactions where users take a board from one station to another. This is an array of arrays, called
trans
:Board: User: Station: Action: Time: [ ['1', 'Ana' , 'Tribeca' , 'check_out', '1:00pm'], ['1', 'Ana' , 'Soho' , 'park', , '2:00pm'], ['1', 'Bob' , 'Soho' , 'check_out', '3:00pm'], ['1', 'Bob' , 'Chelsea' , 'park', , '4:00pm' ], ['2'...] ]
(E.g. Board '1' was checked out by 'Ana' in 'Tribeca' at '1:00pm', parked in Soho at '2:00pm', and then checked out by 'Bob').
With this line of code, I group each board to its transactions:
board_groups = trans .map(lambda oneTrans: (oneTrans[0], [oneTrans[1], oneTrans[2], oneTrans[3], oneTrans[4]])) .groupByKey() .mapValues(list)
giving:
('1', [ ['Ana' , 'Tribeca' , 'check_out', '1:00pm'], ['Ana' , 'Soho' , 'park' , '2:00pm'], ['Bob' , 'Soho' , 'check_out', '3:00pm'], ['Bob' , 'Chelsea' , 'park' , '4:00pm' ] ]), ('2', ...)
How could I achieve this:
('1', ('Ana', [ ['Tribeca', 'check_out', '1:00pm'], ['Soho' , 'park' , '2:00pm'] ] ), ('Bob', [ ['Soho' , 'check_out', '3:00pm'], ['Chelsea', 'park' , '4:00pm' ] ] ) ), ('2'...)
using Spark's methods?
Since the each value of
board_groups
is an array of arrays, I tried to usemapValue
on it, so that for each array within each value, I couldmap
to make each user a key, and then group users together. However I couldn't do this because this involved nested parallelization calls. -
Error when trying to apply Lambda and Alpha arguments when using ml_linear_regression in Spark (using R)
I am trying to run a regression with alpha = 1 and lambda = 0, however I keep getting an error. I will write down the whole code and packages.
library(sparklyr) library(dplyr, warn.conflicts = FALSE) library(ggplot2) library(gridExtra) # Install the `glmnet` package in your user library if(system("hostname", intern=TRUE) == "gbef4001.hpc.wvu.edu"){ Sys.setenv(SPARK_HOME = "/opt/cloudera/parcels/CDH/lib/spark/") sc <- spark_connect( master = "yarn-client", source = "/opt/cloudera/parcels/CDH/lib/spark/bin/spark-submit", version="2.4.0") } else { sc <- spark_connect(master = "local") } path.data <- paste("file://", getwd(), "/prostate.csv", sep = "")
Then I turn the data into a dataframe in both R and then in Spark
prostate_df <- read.csv(path.data) prostate_sdf <- sdf_copy_to(sc, prostate_df, "prostate_sdf", overwrite = TRUE)
Then I try to run an regression using the lambda and alpha arguments
prostate_sdf %>% ml_linear_regression(lpsa ~ lcavol + lweight + age + lbph + svi + lcp + gleason + pgg45, alpha=1, lambda=0)
But I keep getting the error below
Error: 2 components of `...` were not used. We detected these problematic arguments: * `alpha` * `lambda` Did you misspecify an argument?
Any idea how to remedy this? thanks
-
Apply a method designed for a spark dataset on subgroups instead
I have a method that expects a Spark Dataset of a custom Object as an input:
def myAlgorithm(ds : Dataset[CustomObject]) { ... }
However, I know have to use this algorithm on subgroups of this dataset.
If I apply a .groupBy() method on this Dataset I end up having to refactor all myAlgorithm to fit the new structure of the data and that might be quite time consuming. I am also worried about the performance of the algorithm once it is refactored (each subgroup can be quite massive too).
The most straightforward solution I found was to iterate through the keys and filter my dataset :
val keys = ds.map( obj => obj.index ).distinct.collect() val result = for (key <- keys) yield { val filteredDS = ds.filter( obj => obj.index == key) val output = myMLAlgorithm(filteredDS) }
However this solution is highly inneficient and is far from being fast enough for my needs. I also explored the idea of using Futures in the for loop : (based on this video : https://www.youtube.com/watch?v=WZ5TJUYWyU0)
val keys = ds.map( obj => obj.index ).distinct.collect() val futures = for (key <- keys) yield { val filteredDS = ds.filter( obj => obj.index == key) val output = Future { myMLAlgorithm(filteredDS) } } val result = futures.foreach(f => Await.result(f, Duration.Inf))
It's better, but still not efficient enough for my needs.
What is the best practice / most efficient way of dealing with that situation?
-
How to use a external trigger to stop spark structured streaming?
For example I am using spark structured streaming, I want to check if a
stop
file exists, then I can exit my program. I can do something likeval query = SparkSession...load.writeStream.foreachBatch{ if (stop file exist) exit(0) // do some processing here }.start() query.awaitTermination()
However, this could only be triggered if there are new rows appended to this query table. If there are no new rows, even the
stop
file exists, it could not be detected.Any better idea to implement this trigger?
-
Puzzled in writing to kafka whith spark streaming
I'm working on a sparkstreaming project that transpot data from kafka to to another kafka cluster,and pullzed in writing to kafka. I learned from Spark Streaming - read and write on Kafka topic, to wrrap producer and send it to all executors. It works on local mode,but failed on yarn mode no matter yarn-client or yarn-cluster.
Here is my producer-wrapper code:
import org.apache.kafka.clients.producer.{KafkaProducer, _} //kafkaproducer wrapper class KProducer(createProducer: () => Producer[String,String]) extends Serializable{ lazy val producer:Producer[String,String] = createProducer() //(topic,value) def aksend(topic:String,value:String):Unit ={ producer.send( new ProducerRecord[String,String](topic,value), new Callback { override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {} })} } object KProducer{ def apply(config:java.util.Properties):KProducer ={ val createFunc= () => new KafkaProducer[String,String](config) new KProducer(createFunc) } }
And I create a simple demo on github here
Can anybody help me with this problem,please???
-
Error when running spark-submit: java.lang.NoClassDefFoundError: kafka/common/TopicAndPartition
spark-submit --jars spark-streaming-kafka-0-8_2.11-2.4.4.jar direct_approach.py localhost:9092 new_topic
I ran the code above but I don't know why I got this error. I spent hours to fix but I cannot. I am using Spark 2.4.4 and Scala 2.13.0. I tried to set spark.executor.memory and spark.driver.memory in my Spark configuration file but i still could not solve the problem.
Here is the error:
(tutorial-env) (base) harry@harry-badass:~/Desktop/twitter_project$ spark-submit --jars spark-streaming-kafka-0-8_2.11-2.4.4.jar direct_approach.py localhost:9092 new_topic 19/12/14 14:27:23 WARN Utils: Your hostname, harry-badass resolves to a loopback address: 127.0.1.1; using 220.149.84.46 instead (on interface enp4s0) 19/12/14 14:27:23 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/harry/tutorial-env/lib/python3.7/site-packages/pyspark/jars/spark-unsafe_2.11-2.4.4.jar) to method java.nio.Bits.unaligned() WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release 19/12/14 14:27:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 19/12/14 14:27:24 INFO SparkContext: Running Spark version 2.4.4 19/12/14 14:27:24 INFO SparkContext: Submitted application: PythonStreamingDirectKafkaWordCount 19/12/14 14:27:24 INFO SecurityManager: Changing view acls to: harry 19/12/14 14:27:24 INFO SecurityManager: Changing modify acls to: harry 19/12/14 14:27:24 INFO SecurityManager: Changing view acls groups to: 19/12/14 14:27:24 INFO SecurityManager: Changing modify acls groups to: 19/12/14 14:27:24 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(harry); groups with view permissions: Set(); users with modify permissions: Set(harry); groups with modify permissions: Set() 19/12/14 14:27:24 INFO Utils: Successfully started service 'sparkDriver' on port 41699. 19/12/14 14:27:24 INFO SparkEnv: Registering MapOutputTracker 19/12/14 14:27:24 INFO SparkEnv: Registering BlockManagerMaster 19/12/14 14:27:24 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 19/12/14 14:27:24 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 19/12/14 14:27:24 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-2067d2bb-4b7c-49d8-8f02-f20e8467b21e 19/12/14 14:27:24 INFO MemoryStore: MemoryStore started with capacity 434.4 MB 19/12/14 14:27:24 INFO SparkEnv: Registering OutputCommitCoordinator 19/12/14 14:27:24 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 19/12/14 14:27:24 INFO Utils: Successfully started service 'SparkUI' on port 4041. 19/12/14 14:27:24 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://220.149.84.46:4041 19/12/14 14:27:24 INFO SparkContext: Added JAR file:///home/harry/Desktop/twitter_project/spark-streaming-kafka-0-8_2.11-2.4.4.jar at spark://220.149.84.46:41699/jars/spark-streaming-kafka-0-8_2.11-2.4.4.jar with timestamp 1576301244901 19/12/14 14:27:24 INFO Executor: Starting executor ID driver on host localhost 19/12/14 14:27:25 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 46637. 19/12/14 14:27:25 INFO NettyBlockTransferService: Server created on 220.149.84.46:46637 19/12/14 14:27:25 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 19/12/14 14:27:25 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 220.149.84.46, 46637, None) 19/12/14 14:27:25 INFO BlockManagerMasterEndpoint: Registering block manager 220.149.84.46:46637 with 434.4 MB RAM, BlockManagerId(driver, 220.149.84.46, 46637, None) 19/12/14 14:27:25 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 220.149.84.46, 46637, None) 19/12/14 14:27:25 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 220.149.84.46, 46637, None) Exception in thread "Thread-5" java.lang.NoClassDefFoundError: kafka/common/TopicAndPartition at java.base/java.lang.Class.getDeclaredMethods0(Native Method) at java.base/java.lang.Class.privateGetDeclaredMethods(Class.java:3139) at java.base/java.lang.Class.privateGetPublicMethods(Class.java:3164) at java.base/java.lang.Class.getMethods(Class.java:1861) at py4j.reflection.ReflectionEngine.getMethodsByNameAndLength(ReflectionEngine.java:345) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:305) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) at py4j.Gateway.invoke(Gateway.java:274) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:844) Caused by: java.lang.ClassNotFoundException: kafka.common.TopicAndPartition at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:466) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:563) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:496) ... 12 more ERROR:root:Exception while sending command. Traceback (most recent call last): File "/home/harry/tutorial-env/lib/python3.7/site-packages/pyspark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1159, in send_command raise Py4JNetworkError("Answer from Java side is empty") py4j.protocol.Py4JNetworkError: Answer from Java side is empty During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/home/harry/tutorial-env/lib/python3.7/site-packages/pyspark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 985, in send_command response = connection.send_command(command) File "/home/harry/tutorial-env/lib/python3.7/site-packages/pyspark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1164, in send_command "Error while receiving", e, proto.ERROR_ON_RECEIVE) py4j.protocol.Py4JNetworkError: Error while receiving Traceback (most recent call last): File "/home/harry/Desktop/twitter_project/direct_approach.py", line 9, in <module> kvs = KafkaUtils.createDirectStream(ssc, [topic],{"metadata.broker.list": brokers}) File "/home/harry/tutorial-env/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/streaming/kafka.py", line 146, in createDirectStream File "/home/harry/tutorial-env/lib/python3.7/site-packages/pyspark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ File "/home/harry/tutorial-env/lib/python3.7/site-packages/pyspark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 336, in get_return_value py4j.protocol.Py4JError: An error occurred while calling o24.createDirectStreamWithoutMessageHandler 19/12/14 14:27:25 INFO SparkContext: Invoking stop() from shutdown hook 19/12/14 14:27:25 INFO SparkUI: Stopped Spark web UI at http://220.149.84.46:4041 19/12/14 14:27:25 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 19/12/14 14:27:25 INFO MemoryStore: MemoryStore cleared 19/12/14 14:27:25 INFO BlockManager: BlockManager stopped 19/12/14 14:27:25 INFO BlockManagerMaster: BlockManagerMaster stopped 19/12/14 14:27:25 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 19/12/14 14:27:25 INFO SparkContext: Successfully stopped SparkContext 19/12/14 14:27:25 INFO ShutdownHookManager: Shutdown hook called 19/12/14 14:27:25 INFO ShutdownHookManager: Deleting directory /tmp/spark-8e271f94-bec9-4f7e-aad0-1f3b651e9b29 19/12/14 14:27:25 INFO ShutdownHookManager: Deleting directory /tmp/spark-747cc9ca-bca4-42a7-ad82-d6a055727394 19/12/14 14:27:25 INFO ShutdownHookManager: Deleting directory /tmp/spark-747cc9ca-bca4-42a7-ad82-d6a055727394/pyspark-83cc90cc-1aaa-4dea-b364-4b66487be18f
-
Microsoft.WindowsAzure.Storage.StorageException: There is already a lease present
Am getting the below error with using the event hub binding.
public static void Run([EventHubTrigger("tlog_eh_policy", Connection = "EventHub_Conn")] EventData[] events, ILogger log, ExecutionContext context)
Some of the messages are processed and saved by Azure function and some are skipped. I found the below error in the Azure function live metrics. Any idea on this
Microsoft.WindowsAzure.Storage.StorageException: There is already a lease present. at Microsoft.Azure.EventHubs.Processor.AzureStorageCheckpointLeaseManager.AcquireLeaseCoreAsync(AzureBlobLease lease) at Microsoft.Azure.EventHubs.Processor.PartitionManager.<>c__DisplayClass12_2.<b__2>d.MoveNext() Request Information RequestID:7fa2a7c1-a01e-0046-0b06-b2062d000000 RequestDate:Fri, 13 Dec 2019 22:44:14 GMT StatusMessage:There is already a lease present. ErrorCode:LeaseAlreadyPresent ErrorMessage:There is already a lease present. RequestId:7fa2a7c1-a01e-0046-0b06-b2062d000000 Time:2019-12-13T22:44:14.1251726Z
-
How does EventHubClient.CreateFromConnectionString / SendAsync when event hub is down or connection string is wrong
I have a .net application writing to event hub via https://docs.microsoft.com/en-us/dotnet/api/microsoft.servicebus.messaging.eventhubclient?view=azure-dotnet :
var eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName); EventHubEvent e = new EventHubEvent() { ... }; eventHubClient.SendAsync(new EventData(Encoding.UTF8.GetBytes(serializer.Serialize(e))));
How does this code behave in case of : 1. eventHubName is wrong 2. connectionString has wrong SharedAccessKeyName 3. Event hub is otherwise unavailable
the documentation is rather scarce and in my testing, it appears that nothing fails. CreateFromConnectionString returns a valid 'EventHubClient' object and SendAsync just returns.
-
Trying to produce data to azure eventhub from kafka(on premesis)
I'm running kafka producer cli command to produce messages to eventhub. I followed the below steps: 1.
export EVENT_HUBS_NAMESPACE=event_hub_name_space
2.export EVENT_HUB_NAME=event_hub_name
3.export export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/config/jaas.conf"
producer-mirrormaker.config looks like:
bootstrap.servers=eventhubnamespace:9093 client.id=mirror_maker_producer #Required for Event Hubs sasl.mechanism=PLAIN security.protocol=SASL_SSL java.security.auth.login.config=/path-to/jaas.conf my jaas.conf file looks like KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="eventhub connection string" password="eventhub_namespace&connectionstring"; }; kafka cli command: ./kafka-console-producer.sh --topic event-hub-name --broker-list event-hub-namespace:9093 --producer.config /path-to/producer-mirrormaker.config
after running the command, i tried to post a message. Once the message is posted, an error popped up. I'm new to kafka pls let me know what to do
Error Message:
[2019-12-12 20:36:54,333] WARN Failed to send SSL Close message (org.apache.kafka.common.network.SslTransportLayer) java.io.IOException: Broken pipe at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195) at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163) at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:690) at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:47) at org.apache.kafka.common.network.Selector.close(Selector.java:487) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:368) at org.apache.kafka.common.network.Selector.poll(Selector.java:291) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:236) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:135) at java.lang.Thread.run(Thread.java:745) [2019-12-12 20:36:54,739] WARN Failed to send SSL Close message (org.apache.kafka.common.network.SslTransportLayer) java.io.IOException: Broken pipe at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195) at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163) at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:690) at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:47) at org.apache.kafka.common.network.Selector.close(Selector.java:487) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:368) at org.apache.kafka.common.network.Selector.poll(Selector.java:291) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:236) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:135) at java.lang.Thread.run(Thread.java:745) [2019-12-12 20:36:55,142] WARN Failed to send SSL Close message (org.apache.kafka.common.network.SslTransportLayer) java.io.IOException: Broken pipe at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195) at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163) at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:690) at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:47) at org.apache.kafka.common.network.Selector.close(Selector.java:487) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:368) at org.apache.kafka.common.network.Selector.poll(Selector.java:291) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:236) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:135) at java.lang.Thread.run(Thread.java:745)