0

I'm trying convert a spark dataframe to JSON. There are about 1 millions rows in this dataframe and the sample code is below, but the performance is really bad. The desired output would be one member_id shows one time in the JSON file, same for the tag_name under one member_id. Please let me know if any possible way to do this faster.

Sample Code:

iresult = sdf.groupBy('member_id','tag_name').agg(ch.collect_list(ch.struct('detail_name','detail_value')).alias('detail')).\

groupBy('member_id').agg(ch.collect_list(ch.struct('tag_name','detail')).alias('tag'))\

.agg(ch.to_json(ch.collect_list(ch.struct('member_id','tag'))).alias('result'))

result.show()

detail.csv:

member_id, tag_name, detail_name, detail_value
-------------------------------------------------------
abc123, m1, Service_A, 20
abc123, m1, Service_B, 20
abc123, m2, Service_C, 10
xyz456, m3, Service A, 5
xyz456, m3, Service A, 10

Desired Output JSON:

{   "member_id": "abc123",
    "tag":[ {"tag_name": "m1",
            "detail":[{ "detail_name": "Service_A",
                        "detail_value": "20"},
                    {   "detail_name": "Service_B",
                        "detail_value": "20"}]},
            {"tag_name": "m2",
            "detail":[{ "detail_name": "Service_C",
                        "detail_value": "10"}]}]},
{   "member_id": "xyz456",
    "tag":[{"tag_name": "m3",
            "detail":[{ "detail_name": "Service_A",
                        "detail_value": "5"},
                      { "detail_name": "Service_A",
                        "detail_value": "10"}]}]}

duplicate.csv:

member_id, tag_name, detail_name, detail_value
-------------------------------------------------------
abc123, m1, problem_no, 'abc123xyz'
abc123, m1, problem_no, 'abc456zzz'
xyz456, m1, problem_no, 'abc123xyz'
xyz456, m1, problem_no, 'abc456zzz'

Duplicate Output JSON:

{   "member_id": "abc123",
    "tag":[ {"tag_name": "m1",
            "detail":[{ "detail_name": "problem_no",
                        "detail_value": "abc123xyz"},
                    {   "detail_name": "problem_no",
                        "detail_value": "abc456zzz"},
                      { "detail_name": "problem_no",
                        "detail_value": "abc123xyz"},
                    {   "detail_name": "problem_no",
                        "detail_value": "abc456zzz"}]}]},
{   "member_id": "xyz456",
    "tag":[ {"tag_name": "m1",
            "detail":[{ "detail_name": "problem_no",
                        "detail_value": "abc123xyz"},
                    {   "detail_name": "problem_no",
                        "detail_value": "abc456zzz"},
                      { "detail_name": "problem_no",
                        "detail_value": "abc123xyz"},
                    {   "detail_name": "problem_no",
                        "detail_value": "abc456zzz"}]}]}

1 Answer 1

2

Do you mind implementing it through sql statement?

Construct struct layer by layer, and finally use to_json function to generate json string.

df.createOrReplaceTempView('tmp')
sql = """
    select to_json(collect_list(struct(member_id,tag))) as member
    from
        (select member_id,collect_list(struct(tag_name,detail)) as tag
        from
            (select member_id,tag_name,collect_list(struct(detail_name,detail_value)) as detail
            from tmp
            group by member_id,tag_name)
        group by member_id)
"""
df = spark.sql(sql)
df.show(truncate=False)
Sign up to request clarification or add additional context in comments.

2 Comments

it looks good overall but there is a duplicate issue and I can't find out the root cause. I updated the sample dataframe and the duplicated outcome above.
My test results are correct and there is no duplicate data.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.