6

=========== UPDATED ========

I added some more details in my JSON (the struct_c and the array_d) to make it clearer on where I got the exception.

============================

Good day,

I have a Spark DataFrame with a nested array of type struct. I want to select a column from that struct, but got the error message: "org.apache.spark.sql.AnalysisException: cannot resolve 'home.array_a.array_b['a']' due to data type mismatch: argument 2 requires integral type, however, ''a'' is of string type".

Here is my data:

{
  "home": {
    "a_number": 5,
    "a_string": "six",
    "array_a": [
      {
        "array_b": [{"a": "1", "b": 2}],
        "struct_c": {"a": 1.1, "b": 1.3},
        "array_d": ["a", "b", "c"]
      },
      {
        "array_b": [{"a": "3", "b": 4}],
        "struct_c": {"a": 1.5, "b": 1.6},
        "array_d": ["x", "y", "z"]
      }
    ]
  }
}

Here is my data schema:

mydf1 = spark.read.option("multiline", "true").json("myJson.json")
mydf1.printSchema()

root
 |-- home: struct (nullable = true)
 |    |-- a_number: long (nullable = true)
 |    |-- a_string: string (nullable = true)
 |    |-- array_a: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- array_b: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- a: string (nullable = true)
 |    |    |    |    |    |-- b: long (nullable = true)
 |    |    |    |-- array_d: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- struct_c: struct (nullable = true)
 |    |    |    |    |-- a: double (nullable = true)
 |    |    |    |    |-- b: double (nullable = true)

When I select data from either the struct_c or the array_d (array of strings) inside that array_a, there was no issue.

mydf1.select("home.array_a.array_d").show(10, False)

+----------------------+
|array_d               |
+----------------------+
|[[a, b, c], [x, y, z]]|
+----------------------+

mydf1.select(col("home.array_a.struct_c.a").alias("struct_field_inside_arrayA")).show(10, False)

+--------------------------+
|struct_field_inside_arrayA|
+--------------------------+
|[1.1, 1.5]                |
+--------------------------+

And here is where it failed:

mydf1.select("home.array_a.array_b.a").printSchema()
mydf1.select("home.array_a.array_b.a").show()

What I expect is a two-dimension array of string ([["1", "3"]] is my sample JSON)

Could you please help on why it failed?

Thanks for your help.

Fail to execute line 4: mydf1.select("home.array_a.array_b.a").printSchema() Traceback (most recent call last): File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o15300.select. : org.apache.spark.sql.AnalysisException: cannot resolve 'home.array_a.array_b['a']' due to data type mismatch: argument 2 requires integral type, however, ''a'' is of string type.;; 'Project [home#18213.array_a.array_b[a] AS a#18217] +- Relation[home#18213] json

at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$3.applyOrElse(CheckAnalysis.scala:115) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$3.applyOrElse(CheckAnalysis.scala:107) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:278) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:278) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:275) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:275) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:326) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:275) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:93) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:93) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:105) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:105) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:104) at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:116) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$2.apply(QueryPlan.scala:121) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:121) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:126) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:93) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:107) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:85) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:95) at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:108) at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:105) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105) at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3407) at org.apache.spark.sql.Dataset.select(Dataset.scala:1335) at sun.reflect.GeneratedMethodAccessor348.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/tmp/zeppelin_pyspark-5197917387349583174.py", line 380, in exec(code, _zcUserQueryNameSpace) File "", line 4, in File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 1320, in select jdf = self._jdf.select(self._jcols(*cols)) File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in call answer, self.gateway_client, self.target_id, self.name) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: "cannot resolve 'home.array_a.array_b['a']' due to data type mismatch: argument 2 requires integral type, however, ''a'' is of string type.;;\n'Project [home#18213.array_a.array_b[a] AS a#18217]\n+- Relation[home#18213] json\n"

4 Answers 4

4

Since array_a and array_b are array type you cannot select its element directly

You need to explode them as below or you can get by Index

mydf1.withColumn("array_a", explode($"home.array_a"))
  .withColumn("array_b", explode($"array_a.array_b"))
  .select("array_b.a").show(false)

This will gice you

+---+
|a  |
+---+
|1  |
|3  |
+---+
Sign up to request clarification or add additional context in comments.

1 Comment

Thank you Shankar. However, "Since array_a and array_b are array type you cannot select its element directly" <<< this is not true, as in my original post, it is possible to select "home.array_a.another_number". I don't want to use explode though, as I will end up having too many records with duplicated value on other columns. And in the subsequent aggregations, there's a the need to do groupBy.
3

Since you have no problem with the element_at() function, I supposed you are using the spark 2.4+, then you can try Spark SQL built-in functions: transform [1][2] + flatten:

>>> mydf1.selectExpr('flatten(transform(home.array_a.array_b, x -> x.a)) as array_field_inside_array').show()
+------------------------+
|array_field_inside_array|
+------------------------+
|                  [1, 3]|
+------------------------+

Where we use transform() function to retrieve only the values of field a of each array element of home.array_a.array_b and transform them to the array [[1], [3]]. then flatten that array into [1, 3]. If you need the result to be [[1, 3]], then just add array() function

array(flatten(transform(home.array_a.array_b, x -> x.a)))

2 Comments

Thank you @jxc. It works. To have that [[1, 3]] I just only need to remove that "flatten". Does it make any difference between having array(flatten(transform())) and just transform()? And I couldn't find transform()'s equivalent in PySpark. Is it in the form of some other name? Thanks
@Averell, with transform() only, we get [[1], [3]] which I think is not what you are looking for? that's why I added flatten() function. transform() is one of the Spark SQL buildin functions (spark.apache.org/docs/2.4.3/api/sql/index.html). With pyspark dataframes, we can always use df.selectExpr() or spark.sql.functions.expr() to run these SQL functions :), you can google spark sql higher order functions for some more examples of functions related to the array operations.
0

Simply you can select by this:

spark.sql("SELECT home.array_a.array_b[0].a FROM <table>")

3 Comments

Thank you Lamanus. But that would only give the first element of the array array_b.
Then add more cols as you wish.
But I need the struct field "a" from ALL elements in array_b, not all struct fields from the 1st element in array_b.
0

In your example, it failed because you are trying to print the schema of a value not a column.

So if you remove "a" from the select statement then you can print the desired schema.

scala> dataDF.select("home.array_a.array_b").printSchema
root
 |-- array_b: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- a: string (nullable = true)
 |    |    |    |-- b: long (nullable = true)

If you want value from the array ( array_b ), you need to give the index.

scala> dataDF.select(element_at(col("home.array_a.array_b"),1)).show
+-----------------------------------+
|element_at(home.array_a.array_b, 1)|
+-----------------------------------+
|                           [[1, 2]]|
+-----------------------------------+

Can you also give the expected dataframe.

2 Comments

Thank you Praveen. But that would only give the first element of array_b, which is a struct of 2 struct-fields "a" and "b". What I need is the struct field "a" from ALL elements in array_b (which should be [["1", "3"]] in my sample json.
your output is a dataframe with each record to be an array of structs/tuples [(1, 2)]. My expected. While I need a dataframe with each record to be an array of array of string [["1", "3"]].

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.