Welcome toVigges Developer Community-Open, Learning,Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
276 views
in Technique[技术] by (71.8m points)

json - Nested grouping and reducing using Spark Python

I'm trying to do a nested grouping to organize my data like Grouping by User and then inner key campaign grouping and then inner key grouping Metric.

I have the following Dataframe structure.

+----------+--------+------+
|CampaignID|MetricID|UserID|
+----------+--------+------+
|         3|       1|     1|
|         4|       3|     3|
|         4|       2|     3|
|         3|       2|     2|
|         2|       3|     3|
+----------+--------+------+

I wrote the following code to

rdd = newDf.rdd
new = rdd.groupBy(lambda x: x["UserID"]).map(lambda x: (x[0], list(x[1])))
new.take(5)

Output:

[('1',
  [Row(CampaignID='3', MetricID='1', UserID='1'),
   Row(CampaignID='2', MetricID='1', UserID='1'),
   Row(CampaignID='1', MetricID='3', UserID='1'),
  )
]

Please note, I have 10k records. Right now I have grouped the data based on UserID. I'm trying to figure out how to further group it by CampaignID and then further grouping by MetricID. Then count the records with same metric id as shown in output as following json

[{
  "UserID" : "1",
  "data" : [{
    "CampaignID" : "1",
    "data" : [{
      "MetricID" : "1",
      "Count" : "5"
    }]
  }]
}]

There are multiple campaignID and multiple metrics. I'm thinking that first grouping and then reducing to see if same metric id exists to count the data. Any idea or code example would be useful.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

You can do multiple groupBy and collect_list to construct the json data structure, and finally use to_json to convert the result to a json string.

import pyspark.sql.functions as F

result = df.groupBy(
    'UserID', 'CampaignID', 'MetricID'
).count().groupBy(
    'UserID', 'CampaignID'
).agg(
    F.collect_list(F.struct('MetricID', 'count')).alias('data')
).groupBy(
    'UserID'
).agg(
    F.collect_list(F.struct('CampaignID', 'data')).alias('data')
).agg(
    F.to_json(F.collect_list(F.struct('UserID', 'data'))).alias('result')
)

result.show(truncate=False)
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|result                                                                                                                                                                                                                                                                                                  |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[{"UserID":1,"data":[{"CampaignID":3,"data":[{"MetricID":1,"count":1}]}]},{"UserID":3,"data":[{"CampaignID":2,"data":[{"MetricID":3,"count":1}]},{"CampaignID":4,"data":[{"MetricID":3,"count":1},{"MetricID":2,"count":1}]}]},{"UserID":2,"data":[{"CampaignID":3,"data":[{"MetricID":2,"count":1}]}]}]|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to Vigges Developer Community for programmer and developer-Open, Learning and Share
...