diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py index e20d79b83091c..15fe65f5ef5d9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py @@ -384,7 +384,8 @@ def list_insert_create_queries_sql( target_schema, target_table, username, - querytxt as ddl + query as query_id, + LISTAGG(CASE WHEN LEN(RTRIM(querytxt)) = 0 THEN querytxt ELSE RTRIM(querytxt) END) WITHIN GROUP (ORDER BY sequence) as ddl from ( select @@ -393,7 +394,9 @@ def list_insert_create_queries_sql( sti.table as target_table, sti.database as cluster, usename as username, - querytxt, + text as querytxt, + sq.query, + sequence, si.starttime as starttime from stl_insert as si @@ -401,19 +404,20 @@ def list_insert_create_queries_sql( sti.table_id = tbl left join svl_user_info sui on si.userid = sui.usesysid - left join stl_query sq on + left join STL_QUERYTEXT sq on si.query = sq.query left join stl_load_commits slc on slc.query = si.query where sui.usename <> 'rdsdb' - and sq.aborted = 0 and slc.query IS NULL and cluster = '{db_name}' and si.starttime >= '{start_time}' and si.starttime < '{end_time}' + and cluster = 'dev' ) as target_tables - order by cluster, target_schema, target_table, starttime asc + group by cluster, query_id, target_schema, target_table, username, starttime + order by cluster, query_id, target_schema, target_table, starttime asc """.format( # We need the original database name for filtering db_name=db_name, diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift_schema.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift_schema.py index 88573dce8d96a..2673f6762f603 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift_schema.py @@ -367,7 +367,9 @@ def get_lineage_rows( target_table=row[field_names.index("target_table")] if "target_table" in field_names else None, - ddl=row[field_names.index("ddl")] if "ddl" in field_names else None, + ddl=row[field_names.index("ddl")].replace("\\n", "") + if "ddl" in field_names + else None, filename=row[field_names.index("filename")] if "filename" in field_names else None,