Welcome toVigges Developer Community-Open, Learning,Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
3.9k views
in Technique[技术] by (71.8m points)

python - Pyspark write JSON column to Postgres using AWS Glue

I have the following data frame (df4):

+------------------------------------+------------------------------------------+--------------------------+--------------------------+
|id                                  |exclusion_reason                          |created_at                |updated_at                |
+------------------------------------+------------------------------------------+--------------------------+--------------------------+
|4c01d951-2ec5-4ba4-bfe2-8ba9c3029962|{"ship_reason": "A1", "bill_reason": "A1"}|2021-01-12 16:04:19.673197|2021-01-12 16:04:19.694706|
|dac14ca3-bf44-4e3c-80e8-0e2d6d2ff576|{"ship_reason": "A1", "bill_reason": "A1"}|2021-01-12 16:04:19.673197|2021-01-12 16:04:19.694706|
|6d277012-ff6c-4202-bbd7-64cbd467ca28|{"ship_reason": "A1", "bill_reason": "A1"}|2021-01-12 16:04:19.673197|2021-01-12 16:04:19.694706|
|0388163e-2614-4b71-b707-623337d58387|{"ship_reason": "A1", "bill_reason": "A1"}|2021-01-12 16:04:19.673197|2021-01-12 16:04:19.694706|
|01daec52-408c-44e3-965a-b87daa334a1a|{"ship_reason": "A1", "bill_reason": "A1"}|2021-01-12 16:04:19.673197|2021-01-12 16:04:19.694706|
+------------------------------------+------------------------------------------+--------------------------+--------------------------+

that I need to write to a Postgres database. I am using AWS Glue and the Postgres database is in a VPC, so I need to do so using a glue connection and the glueContext.write_dynamic_frame.from_jdbc_conf method. The issue is that I keep getting the error ERROR: column "matchback_exclusion_reason" is of type jsonb but expression is of type character. The data type in the data frame is string, and the datatype in the database is JSONB.

I have seen suggestions that I just need to add stringtype: "unspecified" to my write statement, but the following produces the same error:

datasource2 = DynamicFrame.fromDF(df4, glueContext, "ParquetToWrite")

output = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource2, catalog_connection = "MPtest", connection_options = {"stringtype":"unspecified", "database" : "app", "dbtable" : "orders"})

Can I cast this column to a JSON somehow? I have tried creating a struct type to parse the elements, but that didnt work either (code below):

schema = StructType([StructField("ship_reason", StringType()),StructField("bill_reason", StringType())])
df4Test.select(f.from_json(df4.exclusion_reason, schema).alias("exclusion_reason"))

df4Test = df4Test.withColumn("exclusion_reason", f.from_json(df4.exclusion_reason, schema).alias("exclusion_reason"))

Is it possible to modify the column type to be a JSONB type? Ideally I would basically just like to "json.load" the exclusion_reason column so I can write it to Postgres.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

ResolveChoice may help by casting exclusion_reason to json:

datasource3 = datasource2.resolveChoice(specs = [('exclusion_reason','cast:json')])

From AWS Glue Developer Guide

Use ResolveChoice to specify how a column should be handled when it contains values of multiple types. You can choose to either cast the column to a single data type, discard one or more of the types, or retain all types in either separate columns or a structure. You can select a different resolution policy for each column or specify a global policy that is applied to all columns.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to Vigges Developer Community for programmer and developer-Open, Learning and Share
...