Flink JobManager HA on EMR
Stack EMR: emr-6.1.0 (1 master, 4 core nodes) EMR installed apps: FLINK 1.11.0
AWS documentation says (https://docs.aws.amazon.com/emr/latest/ReleaseGuide/flink-configure.html):
Beginning with Amazon EMR version 5.28.0, JobManager high availability is also enabled automatically. No manual configuration is needed.
But when i send kill signal to Flink jobmanager yarn container -signal container_1601027657994_0003_01_000001 GRACEFUL_SHUTDOWN
(same with FORCEFUL_SHUTDOWN
) yarn container nothing happens. Yarn won't restart the app.
- Do i need to enable EMR Zookeeper as well ? (most probably yes, otherwise, I don’t understand how flink will understand from which savepoint to restart the application).
- Should i use a EMR cluster with 3 master nodes to have HA for Flink?
See also questions close to this topic
-
Analyzing DynamoDB data using AWS Athena
I have a DynamoDB with tables and items that I want to create a dashboard for. After research, I learned that AWS Athena and Quicksight allow me to analyze, query, and create a dashboard for my site. I set up all the necessary connectors to stream Dynamo table items through Lambda to an S3 bucket that is crawled with AWS Glue and then accessible in Athena. My question is, does this mean all my DynamoDB table items are stored twice? Once in DynamoDB and once in the S3 bucket that Athena uses to query data?
Is this practical to have my data located in two spots? Are there any other solutions?
-
Switching profiles AWS Secret Manager - .Net Core
I am trying to switch profiles in code using appsettings.json for a .net core web service. If I new up an instance of the client in code eg:
using var client = new AmazonSecretsManagerClient(); client.ListSecretsAsync(listRequest, default) .GetAwaiter() .GetResult();
this takes the default profile that was set up and works fine.
Instead of using the default profile, I want to control the profile from json file. I am trying to follow this:
https://docs.aws.amazon.com/sdk-for-net/latest/developer-guide/net-dg-config-netcore.html
Instead of newing up the client, I am using dependency injection
public SecretsService(IAmazonSecretsManager secretsManagerClient) { _secretsManagerClient = secretsManagerClient; }
Here is the config:
"AWS": { "Profile": "default", "Region": "us-east-2" }
When I make a call using,
_secretsManagerClient.ListSecretsAsync(listRequest, default) .GetAwaiter() .GetResult();
it gives an exception
No such host is known. (secretsmanager.defaultregion.amazonaws.com:443)
What am I missing?
-
DynamoDB Enhanced Client (very) slow start for 1st request, possible to do "select 1 from"?
We are attempting to use Kotlin with AWS Lambda, and it seems that the first query executed by DynamoDbEnhancedClient always takes up to 3 seconds, while any subsequent queries takes less than 50ms.
We do have a warmup process that pre-warms these lambdas on a schedule, is there a way to fire a request via DynamoDbEnhancedClient without incurring cost? Like a describe or "select 1 from"?
-
Apache Flink - Checkpointed Data Size is increasing over the period of time
I am using Event time semantics in my Flink application (version 1.11.1) which is running in AWS - kinesis analytics. This application has source as kinesis stream and sink as Postgres. Checkpointing interval is 10 seconds as DB sink is triggered on notifyCheckpointComplete(). I am using multiple CoProcessFunction along with ValueState to connect different streams before I sink it to Postgres.
Observation is the Checkpointed Data Size is growing over the period of time whereas thread count and Heap memory utilization remains constant. CPU utilization does not go beyond 30 percent. I am hoping the checkpoint data size to plateau eventually.
While going through flink documentation on State TTL, it seems that currently state ttl only supported for Processing time semantics - State Time-To-Live (TTL)
What is the way forward for Event time based Flink application?
-
Trying to read file from s3 with FLINK using the IDE getting Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
I am trying to read a file from s3 using Flink from IntelliJ and getting the following exception:
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
This how my code looks like :
import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.parquet.column.page.PageReadStore import org.apache.parquet.example.data.simple.convert.GroupRecordConverter import org.apache.parquet.hadoop.ParquetFileReader import org.apache.parquet.hadoop.util.HadoopInputFile import org.apache.parquet.io.ColumnIOFactory class ParquetSourceFunction extends SourceFunction[String]{ override def run(ctx: SourceFunction.SourceContext[String]): Unit = { val inputPath = "s3a://path-to-bucket/" val outputPath = "s3a://path-to-output-bucket/" val conf = new Configuration() conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") val readFooter = ParquetFileReader.open(HadoopInputFile.fromPath(new Path(inputPath), conf)) val metadata = readFooter.getFileMetaData val schema = metadata.getSchema val parquetFileReader = new ParquetFileReader(conf, metadata, new Path(inputPath), readFooter.getRowGroups, schema.getColumns) // val parquetFileReader2 = new ParquetFileReader(new Path(inputPath), ParquetReadOptions) var pages: PageReadStore = null try { while ({ pages = parquetFileReader.readNextRowGroup; pages != null }) { val rows = pages.getRowCount val columnIO = new ColumnIOFactory().getColumnIO(schema) val recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema)) (0L until rows).foreach { _ => val group = recordReader.read() val myString = group.getString("field_name", 0) ctx.collect(myString) } } } } override def cancel(): Unit = ??? } object Job { def main(args: Array[String]): Unit = { // set up the execution environment lazy val env = StreamExecutionEnvironment.getExecutionEnvironment lazy val stream = env.addSource(new ParquetSourceFunction) stream.print() env.execute() } }
Sbt dependencies : val flinkVersion = "1.12.1"
val flinkDependencies = Seq( "org.apache.flink" %% "flink-clients" % flinkVersion,// % "provided", "org.apache.flink" %% "flink-scala" % flinkVersion,// % "provided", "org.apache.flink" %% "flink-streaming-scala" % flinkVersion, // % "provided") "org.apache.flink" %% "flink-parquet" % flinkVersion) lazy val root = (project in file(".")). settings( libraryDependencies ++= flinkDependencies, libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "3.3.0" , libraryDependencies += "org.apache.parquet" % "parquet-hadoop" % "1.11.1", libraryDependencies += "org.apache.flink" %% "flink-table-planner-blink" % "1.12.1" //% "provided" )
-
Can I run Apache Flink within an Android App?
I was trying to run Apache Flink within an Android App. I just want to run a minimum working example, like this:
@Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); runFlinkExample(); } private void runFlinkExample() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Integer> stream = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5)); stream.print(); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } }
These are my two .gradle files:
build.gradle (Module)
plugins { id 'com.android.application' } android { compileSdkVersion 30 buildToolsVersion "30.0.3" defaultConfig { applicationId "com.example.flink" minSdkVersion 26 targetSdkVersion 30 versionCode 1 versionName "1.0" testInstrumentationRunner "androidx.test.runner.AndroidJUnitRunner" } buildTypes { release { minifyEnabled false proguardFiles getDefaultProguardFile('proguard-android-optimize.txt'), 'proguard-rules.pro' } } compileOptions { sourceCompatibility JavaVersion.VERSION_1_8 targetCompatibility JavaVersion.VERSION_1_8 } packagingOptions { exclude 'META-INF/DEPENDENCIES' exclude 'reference.conf' } } dependencies { implementation 'androidx.appcompat:appcompat:1.2.0' implementation 'com.google.android.material:material:1.3.0' implementation 'androidx.constraintlayout:constraintlayout:2.0.4' testImplementation 'junit:junit:4.+' androidTestImplementation 'androidx.test.ext:junit:1.1.2' androidTestImplementation 'androidx.test.espresso:espresso-core:3.3.0' // Flink implementation 'org.apache.flink:flink-streaming-java_2.12:1.12.1' implementation 'org.apache.flink:flink-clients_2.12:1.12.1' }
build.gradle (Project)
// Top-level build file where you can add configuration options common to all sub-projects/modules. buildscript { repositories { google() jcenter() } dependencies { classpath "com.android.tools.build:gradle:4.1.2" // NOTE: Do not place your application dependencies here; they belong // in the individual module build.gradle files } } allprojects { repositories { google() jcenter() } } task clean(type: Delete) { delete rootProject.buildDir }
The first problem is, that I get the following Error:
Caused by: java.lang.ClassNotFoundException: Didn't find class "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment" on path: DexPathList[[zip file "/data/app/~~DbT_CZ7AhLED2xZgLBk ....
In cases there this error doesn't appear, I get Akka-Actor errors, because I must exclude 'reference.conf', otherwise the code wouldn't compile. However, this leads to more exceptions, e.g. missing akka-version.
So my general question is: Is it possible to run Flink within an Android-App? Or is this not possible (recommended)? Perhaps someone knows how to modfiy my gradle files (or something else) to run my example. Or perhaps someone already has successfully used Flink in Android.
-
Error running Scala Spark application with AWS
I have deployed the jar of spark on EMR cluster and I submitted the spark job using spark-submit command follow by my project jar. Unfortunately I receive this error:
21/03/02 21:08:48 WARN SparkSession$Builder: Using an existing SparkSession; some spark core configurations may not take effect. 21/03/02 21:08:56 WARN TaskSetManager: Lost task 1.0 in stage 1.0 (TID 2, ip-172-31-13-24.ec2.internal, executor 1): java.lang.ExceptionInInitializerError at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1274) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2296) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.SparkException: A master URL must be set in your configuration at org.apache.spark.SparkContext.<init>(SparkContext.scala:368) at RunAWS$.<init>(RunAWS.scala:18) at RunAWS$.<clinit>(RunAWS.scala) ... 55 more 21/03/02 21:08:56 WARN TaskSetManager: Lost task 1.1 in stage 1.0 (TID 3, ip-172-31-13-24.ec2.internal, executor 1): java.lang.NoClassDefFoundError: Could not initialize class RunAWS$ at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1274) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2296) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 21/03/02 21:08:57 ERROR TaskSetManager: Task 1 in stage 1.0 failed 4 times; aborting job Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.0 failed 4 times, most recent failure: Lost task 1.3 in stage 1.0 (TID 5, ip-172-31-13-78.ec2.internal, executor 2): java.lang.NoClassDefFoundError: Could not initialize class RunAWS$ at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1274) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2296) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2080) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2068) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2067) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2067) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:988) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:988) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:988) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2301) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2250) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2239) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:799) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:385) at org.apache.spark.rdd.RDD.collect(RDD.scala:989) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:743) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:742) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:385) at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:742) at RunAWS$.main(RunAWS.scala:397) at RunAWS.main(RunAWS.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.NoClassDefFoundError: Could not initialize class RunAWS$ at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1274) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2296) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
As I can see in the first line I think that it is a problem of the Spark-Session, but I'm not sure about it.
The code where I declare the Spark-Session is:
object RunAWS{ //SET OF SPARK ENVIRONMENT val conf = new SparkConf().setAppName("MyApp") val sc = new SparkContext(conf) val ss = SparkSession .builder() .appName("MyApplication") .config("spark.some.config.option", "some-value") .getOrCreate() //REMOVE LOGS FROM TERMINAL ss.sparkContext.setLogLevel("WARN") ....
Someone can help me?
-
How to install Cudatoolkit on AWS EMR to enable distributed training in Tensorflow?
I'm trying to run some tensorflow-recommenders code on an AWS EMR cluster.
My testing Spark cluster has a g3.4xlarge master node and a few g3.4xlarge core nodes.
I have a bootstrap action which does the following:
sudo yum -y install cudatoolkit sudo yum -y install nvidia
At runtime however it appears that some of the cudatoolkit libs aren't found:
Not creating XLA devices, tf_xla_enable_xla_devices not set Successfully opened dynamic library libcuda.so.1 I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:941] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero I tensorflow/core/common_runtime/gpu/gpu_device.cc:1720] Found device 0 with properties: pciBusID: 0000:00:1e.0 name: Tesla M60 computeCapability: 5.2 coreClock: 1.1775GHz coreCount: 16 deviceMemorySize: 7.44GiB deviceMemoryBandwidth: 149.31GiB/s W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'libcublas.so.11'; dlerror: libcublas.so.11: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'libcublasLt.so.11'; dlerror: libcublasLt.so.11: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcufft.so.10 I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcurand.so.10 I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcusolver.so.10 W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'libcusparse.so.11'; dlerror: libcusparse.so.11: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'libcudnn.so.8'; dlerror: libcudnn.so.8: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native W tensorflow/core/common_runtime/gpu/gpu_device.cc:1757] Cannot dlopen some GPU libraries. Please make sure the missing libraries mentioned above are installed properly if you would like to use GPU. Follow the guide at https://www.tensorflow.org/install/gpu for how to download and setup the required libraries for your platform.
Would I need to augment the LD_LIBRARY_PATH within my bootstrap action? I'd worry about it taking effect without a reboot and I'd worry about trying to reboot from within a bootstrap action.
Has anyone else come across this type of problem? Any recommendations? Thanks.
-
How to fix java.lang.NoSuchMethodError on AWS EMR/Spark?
I am running a Spark project on AWS/EMR which includes the following dependency:
"com.google.apis" % "google-api-services-analyticsreporting" % "v4-rev20210106-1.31.0" "com.google.api-client" % "google-api-client" % "1.31.1" excludeAll(ExclusionRule(organization = "com.google.guava")) "com.google.guava" % "guava" % "29.0-jre"
However at runtime I get the error:
ERROR ApplicationMaster: User class threw exception: java.lang.NoSuchMethodError: com.google.common.io.ByteStreams.exhaust(Ljava/io/InputStream;)J java.lang.NoSuchMethodError: com.google.common.io.ByteStreams.exhaust(Ljava/io/InputStream;)J at com.google.api.client.http.ConsumingInputStream.close(ConsumingInputStream.java:40) ....
This is the assembly definition:
lazy val assemblySettings = nocomma { assembly / target := (baseDirectory in ThisBuild).value / "target" / "jars" assembly / assemblyJarName := s"${name.value}.jar" assembly / test := {} assembly / assemblyOption := (assemblyOption in assembly).value.copy(includeScala = false) assembly / assemblyMergeStrategy := { case PathList("log4j.properties") => MergeStrategy.discard case PathList("/org/apache/log4j", _@_*) => MergeStrategy.discard case PathList("META-INF", _@_*) => MergeStrategy.discard case x if Assembly.isConfigFile(x) => MergeStrategy.concat case _ => MergeStrategy.first } }
After I have unzipped the file, I can see the
.class
files at the right places. Any idea? -
How to have highly available Moodle in Kubernetes?
Want to set up highly available Moodle in K8s (on-prem). I'm using Bitnami Moodle with helm charts.
After a successful Moodle installation, it becomes work. But when a K8s node down, Moodle web page displays/reverts/redirects to the Moodle installation web page. It's like a loop.
Persistence storage is rook-ceph. Moodle PVC is ReadriteMany where Mysql is ReadWriteOnce.
The following command was used to deploy Moodle.
helm install moodle --set global.storageClass=rook-cephfs,replicaCount=3,persistence.accessMode=ReadWriteMany,allowEmptyPassword=false,moodlePassword=Moodle123,mariadb.architecture=replication bitnami/moodle
Any help on this is appreciated.
Thanks.
-
High-Availability not working in Hadoop cluster
I am trying to move my non-HA namenode to HA. After setting up all the configurations for JournalNode by following the Apache Hadoop documentation, I was able to bring the namenodes up. However, the namenodes are crashing immediately and throwing the follwing error.
ERROR org.apache.hadoop.hdfs.server.namenode.NameNode: Failed to start namenode. java.io.IOException: There appears to be a gap in the edit log. We expected txid 43891997, but got txid 45321534.
I tried to recover the edit logs, initialize the shared edits etc., but nothing works. I am not sure how to fix this problem without formatting namenode since I do not want to loose any data.
Any help is greatly appreciated. Thanking in advance.
-
Apache Kafka Consume from Slave/ISR node
I understand the concept of master/slave and data replication in Kafka, but i don't understand why consumers and producers will always be routed to a master node when writing/reading from a partition instead of being able to read from any ISR (in-sync replica)/slave.
The way i think about it, if all consumers are redirected to one single master node, then more hardware is required to handle read/write operations from large consumer groups/producers.
Is it possible to read and write in slave nodes or the consumers/producers will always reach out to the master node of that partition?