Skip to content

Commit

Permalink
Update oracle.py
Browse files Browse the repository at this point in the history
  • Loading branch information
acrylJonny committed Nov 15, 2024
1 parent daa506b commit f4a2e01
Showing 1 changed file with 46 additions and 46 deletions.
92 changes: 46 additions & 46 deletions metadata-ingestion/src/datahub/ingestion/source/sql/oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,65 +377,65 @@ def _get_constraint_data(
params = {"table_name": table_name}

# Simplified query that's more reliable with SQLAlchemy 1.4
text = """
SELECT
ac.constraint_name,
ac.constraint_type,
acc.column_name AS local_column,
NULL AS remote_table,
NULL AS remote_column,
NULL AS remote_owner,
acc.position AS loc_pos,
NULL AS rem_pos,
ac.search_condition,
ac.delete_rule
FROM all_constraints ac
JOIN all_cons_columns acc
ON ac.owner = acc.owner
AND ac.constraint_name = acc.constraint_name
AND ac.table_name = acc.table_name
WHERE ac.table_name = :table_name
AND ac.constraint_type IN ('R','P', 'U', 'C')
"""
text = (
"SELECT"
"\nac.constraint_name,"
"\nac.constraint_type,"
"\nacc.column_name AS local_column,"
"\nNULL AS remote_table,"
"\nNULL AS remote_column,"
"\nNULL AS remote_owner,"
"\nacc.position AS loc_pos,"
"\nNULL AS rem_pos,"
"\nac.search_condition,"
"\nac.delete_rule"
"\nFROM all_constraints ac"
"\nJOIN all_cons_columns acc"
"\nON ac.owner = acc.owner"
"\nAND ac.constraint_name = acc.constraint_name"
"\nAND ac.table_name = acc.table_name"
"\nWHERE ac.table_name = :table_name"
"\nAND ac.constraint_type IN ('R','P', 'U', 'C')"
)

if schema is not None:
params["owner"] = schema
text += "\nAND ac.owner = :owner"

# For foreign keys, join with the remote columns
text += """
UNION ALL
SELECT
ac.constraint_name,
ac.constraint_type,
acc.column_name AS local_column,
ac.r_table_name AS remote_table,
rcc.column_name AS remote_column,
ac.r_owner AS remote_owner,
acc.position AS loc_pos,
rcc.position AS rem_pos,
ac.search_condition,
ac.delete_rule
FROM all_constraints ac
JOIN all_cons_columns acc
ON ac.owner = acc.owner
AND ac.constraint_name = acc.constraint_name
AND ac.table_name = acc.table_name
LEFT JOIN all_cons_columns rcc
ON ac.r_owner = rcc.owner
AND ac.r_constraint_name = rcc.constraint_name
AND acc.position = rcc.position
WHERE ac.table_name = :table_name
AND ac.constraint_type = 'R'
"""
text +=(
"\nUNION ALL"
"\nSELECT"
"\nac.constraint_name,"
"\nac.constraint_type,"
"\nacc.column_name AS local_column,"
"\nac.r_table_name AS remote_table,"
"\nrcc.column_name AS remote_column,"
"\nac.r_owner AS remote_owner,"
"\nacc.position AS loc_pos,"
"\nrcc.position AS rem_pos,"
"\nac.search_condition,"
"\nac.delete_rule"
"\nFROM all_constraints ac"
"\nJOIN all_cons_columns acc"
"\nON ac.owner = acc.owner"
"\nAND ac.constraint_name = acc.constraint_name"
"\nAND ac.table_name = acc.table_name"
"\nLEFT JOIN all_cons_columns rcc"
"\nON ac.r_owner = rcc.owner"
"\nAND ac.r_constraint_name = rcc.constraint_name"
"\nAND acc.position = rcc.position"
"\nWHERE ac.table_name = :table_name"
"\nAND ac.constraint_type = 'R'"
)

if schema is not None:
text += "\nAND ac.owner = :owner"

text += "\nORDER BY constraint_name, loc_pos"

rp = self._inspector_instance.bind.execute(sql.text(text), params)
return rp.fetchall()
return list(rp.fetchall())

def get_pk_constraint(
self, table_name: str, schema: Optional[str] = None, dblink: str = ""
Expand Down

0 comments on commit f4a2e01

Please sign in to comment.