Вопрос: Доступ к столбцу в фреймворке с использованием Spark


Я работаю над версией SPARK 1.6.1 с использованием SCALA и сталкиваюсь с необычной проблемой. При создании нового столбца с использованием существующего столбца, созданного во время одного и того же выполнения, получается «org.apache.spark.sql.AnalysisException».
ЗА РАБОТОЙ: ,

 val resultDataFrame = dataFrame.withColumn("FirstColumn",lit(2021)).withColumn("SecondColumn",when($"FirstColumn" - 2021 === 0, 1).otherwise(10))
    resultDataFrame.printSchema().

НЕ РАБОТАЕТ  

val resultDataFrame = dataFrame.withColumn("FirstColumn",lit(2021)).withColumn("SecondColumn",when($"FirstColumn" - **max($"FirstColumn")** === 0, 1).otherwise(10))
resultDataFrame.printSchema().

Здесь я создаю свою SecondColumn, используя FirstColumn, созданную во время того же выполнения. Вопрос в том, почему он не работает при использовании функций avg / max. Пожалуйста, дайте мне знать, как я могу решить эту проблему.


4


источник


Ответы:


Если вы хотите использовать агрегатные функции вместе с «нормальными» столбцами, функции должны появиться после groupBy или с условием определения окна. Из этих случаев они не имеют никакого смысла. Примеры:

val result = df.groupBy($"col1").max("col2").as("max") // This works

В приведенном выше случае результирующий DataFrame будет иметь как столбцы «col1», так и «max».

val max = df.select(min("col2"), max("col2")) 

Это работает, потому что в запросе есть только агрегированные функции. Однако следующее не будет работать:

val result = df.filter($"col1" === max($"col2"))

потому что я пытаюсь смешивать не агрегированный столбец с агрегированным столбцом.

Если вы хотите сравнить столбец с агрегированным значением, вы можете попробовать присоединиться:

val maxDf = df.select(max("col2").as("maxValue"))
val joined = df.join(maxDf)
val result = joined.filter($"col1" === $"maxValue").drop("maxValue")

Или даже используйте простое значение:

val maxValue = df.select(max("col2")).first.get(0)
val result = filter($"col1" === maxValue)

2