3

I am getting the below error:

py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.

on this line:

result = df.select('student_age').rdd.flatMap(lambda x: x).collect()

'student_age' is a column name. It was running fine until last week but now this error.

Does anyone have any insights on that?

5
  • 1
    Could you share more about the error log and your code? Commented Aug 6, 2022 at 10:55
  • 1
    Is this the full stack trace? Also, "running fine until last week" -- have you updated anything recently like java version? Also, where are you running this? Commented Aug 7, 2022 at 6:43
  • no nothing has changed Commented Aug 7, 2022 at 22:36
  • all logs: ctxt.io/2/AADge2-UFg Commented Aug 8, 2022 at 0:39
  • @Slickmind could you show your code of data_percentage and count_percentage function? Commented Aug 8, 2022 at 4:40

2 Answers 2

1
+50

Using collect is dangerous for this very reason, It's prone to Out Of Memory errors. I suggest removing it. You also do not need to use a rdd for this you can do this with a data frame:

result = df.select(explode(df['student_age'])) #returns a dataFrame
#write code to use a data frame instead of any array.

If nothing else changed, likely the data did, and finally outgrew the size in memory.

It's also possible that you have new 'bad' data that is throwing an error.

Either way you could likely prove this by find this(OOM) or prove the data is bad by printing it.

def f(row):
    print(row.student_age)

result.foreach(f) # used for simple stuff that doesn't require heavy initialization.

IF that works you may want to break your code down to use foreachPartition. This will let you do math on each value in the memory of each executor. The only trick is that within fun below as you are executing this code on the executor you cannot reference anything that uses sparkContext. (Python code only instead of Pyspark).

def f(rows):
    #intialize a database connection here
    for row in rows:
        print(row.student_age) # do stuff with student_age
    #close database connection here

result.foreachPartition(f) # used for things that need heavy initialization

Spark foreachPartition vs foreach | what to use?

Sign up to request clarification or add additional context in comments.

1 Comment

Thanks Matt! Let me try it!
0

This issue is solved, here is the answer:

result = [i[0] for i in df.select('student_age').toLocalIterator()]

3 Comments

This may work for your use case as it does decrease memory pressure using an iterator instead of a collect(). You are not doing pyspark at this point you are doing python coding. That is to say, principally, you are using a small data approach instead of using big data approach. I'd encourage you to think bigger and use dataFrames over using python arrays. It will scale better.
I am getting this error: ******** File "C:\Users\Jarvis\AppData\Local\Programs\Python\Python38\lib\site-packages\pyspark\sql\column.py", line 470, in iter raise TypeError("Column is not iterable") ********************* result = df.select(explode(df['student_age'])) unique_result = list(set([j for i in result for j in xs]))
You are still using Python instead of pyspark. To make the Colum unique just call distinct() on it. You might benefit from more studying of pyspark. Clearly you know Python.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.