Ausgabe
Ich habe fast fertig, was ich tun möchte, aber die Methode, die in ein JSON-Objekt konvertiert, hilft mir nicht, das zu lösen, was fehlt. Ich möchte das Gleiche bekommen, aber es wird mehr Inhalt in ” add ” und in ” firsts ” geben, und deshalb brauche ich sie, um Arrays von Objekten zu sein.
Mein Code:
case class FirstIdentity(docType: String, docNumber: String, pId: String)
case class SecondIdentity(firm: String, code: String, orgType: String,
orgNumber: String, typee: String, perms: Seq[String])
case class General(id: Int, pName: String, description: String, add: Seq[SecondIdentity],
delete: Seq[String], act: String, firsts: Seq[FirstIdentity])
val someDF = Seq(
("0010XR_TYPE_6","0010XR", "222222", "6", "TYPE", "77444478", "6", 123, 1, "PF 1", "name", "description",
Seq("PERM1", "PERM2"))
).toDF("firm", "code", "org_number", "org_type", "type", "doc_number",
"doc_type", "id", "p_id", "p_name", "name", "description", "perms")
someDF.createOrReplaceTempView("vw_test")
val filter = spark.sql("""
select
firm, code, org_number, org_type, type, doc_number,
doc_type, id, p_id, p_name, name, description, perms
from vw_test
""")
val group =
filter.rdd.map(x => {
(
x.getInt(x.fieldIndex("id")),
x.getString(x.fieldIndex("p_name")),
x.getString(x.fieldIndex("description")),
SecondIdentity(
x.getString(x.fieldIndex("firm")),
x.getString(x.fieldIndex("code")),
x.getString(x.fieldIndex("org_type")),
x.getString(x.fieldIndex("org_number")),
x.getString(x.fieldIndex("type")),
x.getSeq(x.fieldIndex("perms"))
),
"act",
FirstIdentity(
x.getString(x.fieldIndex("doc_number")),
x.getString(x.fieldIndex("doc_type")),
x.getInt(x.fieldIndex("p_id")).toString
)
)
})
.toDF("id", "name", "desc", "add", "actKey", "firsts")
.groupBy("id", "name", "desc", "add", "actKey", "firsts")
.agg(collect_list("add").as("null"))
.drop("null")
group.toJSON.show(false)
Ergebnis:
{
"id": 123,
"name": "PF 1",
"desc": "description",
"add": {
"firm": "0010XR_TYPE_6",
"code": "0010XR",
"orgType": "6",
"orgNumber": "222222",
"typee": "TYPE",
"perms": [
"PERM1",
"PERM2"
]
},
"actKey": "act",
"firsts": {
"docType": "77444478",
"docNumber": "6",
"pId": "1"
}
}
Ich möchte ein Array von ” add ” und auch von ” firsts ” haben
Dies:
BEARBEITEN
{
"id": 123,
"name": "PF 1",
"desc": "description",
"add": [ <----
{
"firm": "0010XR_TYPE_6",
"code": "0010XR",
"orgType": "6",
"orgNumber": "222222",
"typee": "TYPE",
"perms": [
"PERM1",
"PERM2"
]
},
{
"firm": "0010XR_TYPE_6",
"code": "0010XR",
"orgType": "5",
"orgNumber": "11111",
"typee": "TYPE2",
"perms": [
"PERM1",
"PERM2"
]
}
],
"actKey": "act",
"firsts": [ <----
{
"docType": "77444478",
"docNumber": "6",
"pId": "1"
},
{
"docType": "411133",
"docNumber": "6",
"pId": "2"
}
]
}
Lösung
Gemäß Ihrem Kommentar möchten Sie das Hinzufügen in Abhängigkeit von einer Gruppierung aggregieren. Bitte überprüfen Sie alle Spalten, nach denen Sie gruppieren möchten. Die Spalten, die Sie aggregieren möchten, können nicht Teil der Gruppierung sein. Das wird niemals funktionieren und Sie werden immer separate Aufzeichnungen erhalten.
Dies wird gemäß Ihren Erwartungen funktionieren (nehme ich an):
val group =
filter.rdd.map(x => {
(
x.getInt(x.fieldIndex("id")),
x.getString(x.fieldIndex("p_name")),
x.getString(x.fieldIndex("description")),
SecondIdentity(
x.getString(x.fieldIndex("firm")),
x.getString(x.fieldIndex("code")),
x.getString(x.fieldIndex("org_type")),
x.getString(x.fieldIndex("org_number")),
x.getString(x.fieldIndex("type")),
x.getSeq(x.fieldIndex("perms"))
),
"act",
FirstIdentity(
x.getString(x.fieldIndex("doc_number")),
x.getString(x.fieldIndex("doc_type")),
x.getInt(x.fieldIndex("p_id")).toString
)
)
})
.toDF("id", "name", "desc", "add", "actKey", "firsts")
.groupBy("id", "name", "desc", "actKey")
.agg(collect_list("add").as("null"))
.drop("null")
Ergebnis:
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"id":123,"name":"PF 1","desc":"description","actKey":"act","collect_list(add)":[{"firm":"0010XR_TYPE_6","code":"0010XR","orgType":"6","orgNumber":"222222","typee":"TYPE","perms":["PERM1","PERM2"]},{"firm":"0010XR_TYPE_5","code":"0010XR","orgType":"5","orgNumber":"222223","typee":"TYPE","perms":["PERM1","PERM2"]}]}|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
In Ihrer Kartenfunktion ordnen Sie FirstEntity und SecondEntity nicht als Seq zu, daher wird die Addition nicht in ein Array konvertiert.
Ändern Sie Ihre Kartenfunktion wie folgt:
filter.rdd.map(x => {
(
x.getInt(x.fieldIndex("id")),
x.getString(x.fieldIndex("p_name")),
x.getString(x.fieldIndex("description")),
Seq(SecondIdentity(
x.getString(x.fieldIndex("firm")),
x.getString(x.fieldIndex("code")),
x.getString(x.fieldIndex("org_type")),
x.getString(x.fieldIndex("org_number")),
x.getString(x.fieldIndex("type")),
x.getSeq(x.fieldIndex("perms"))
)),
"act",
Seq(FirstIdentity(
x.getString(x.fieldIndex("doc_number")),
x.getString(x.fieldIndex("doc_type")),
x.getInt(x.fieldIndex("p_id")).toString
))
)
})
Wird dazu führen:
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"id":123,"name":"PF 1","desc":"description","add":[{"firm":"0010XR_TYPE_6","code":"0010XR","orgType":"6","orgNumber":"222222","typee":"TYPE","perms":["PERM1","PERM2"]}],"actKey":"act","firsts":[{"docType":"77444478","docNumber":"6","pId":"1"}]}|
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Beantwortet von – Mrunal Badhe
Antwort geprüft von – Gilberto Lyons (FixError Admin)