Skip to content

Commit

Permalink
feat: updating bq set features to use SDK instead of REST API
Browse files Browse the repository at this point in the history
  • Loading branch information
kmaphoenix committed Nov 22, 2024
1 parent 5b28070 commit 1193056
Showing 1 changed file with 123 additions and 45 deletions.
168 changes: 123 additions & 45 deletions src/dfcx_scrapi/core/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
# limitations under the License.

import logging
import requests
from typing import Dict, List
from google.cloud.dialogflowcx_v3beta1 import services
from google.cloud.dialogflowcx_v3beta1 import types
from google.protobuf import field_mask_pb2
from google.cloud import bigquery

from dfcx_scrapi.core import scrapi_base
from dfcx_scrapi.core import environments
Expand Down Expand Up @@ -56,6 +56,64 @@ def __init__(
self.client_options = self._set_region(agent_id)

self.langauge_code = language_code
self.bq_client = None
self.bq_table_map = None

def modify_bq_settings_kwargs(
self,
agent_id: str,
bq_settings: Dict[str, str],
enable_stackdriver_logging: bool = None,
enable_interaction_logging: bool = None,
bigquery_enabled: bool = None,
bigquery_table: str = None
):
"""Map the input kwargs from the method to the bq_settings payload."""
# Update each of the kwargs as provided
if enable_stackdriver_logging is not None:
bq_settings["advanced_settings"]["logging_settings"][
"enable_stackdriver_logging"] = enable_stackdriver_logging

if enable_interaction_logging is not None:
bq_settings["advanced_settings"]["logging_settings"][
"enable_interaction_logging"] = enable_interaction_logging

if bigquery_enabled is not None:
bq_settings["bigquery_export_settings"][
"enabled"] = bigquery_enabled

if bigquery_table is not None:
if not self.bq_table_map:
self.bq_table_map = self.get_bq_dataset_table_map(agent_id)

table_id = self.bq_table_map.get(bigquery_table, None)
if not table_id:
raise KeyError(
f"Table `{bigquery_table}` does not exist in project.")
bq_settings["bigquery_export_settings"]["bigquery_table"] = table_id

return bq_settings

def get_bq_dataset_table_map(self, agent_id: str) -> Dict[str, str]:
"""Get all dataset/tables accessible from the provided agent_id."""
parts = self._parse_resource_path("agent", agent_id)
project_id = parts.get("project", None)

if not self.bq_client:
self.bq_client = bigquery.Client(
project=project_id,
credentials=self.creds
)

bq_table_map = {}
dataset_id_format = f"projects/{project_id}/datasets/{{}}/tables/{{}}"

for dataset in self.bq_client.list_datasets(project=project_id):
for table in self.bq_client.list_tables(dataset):
bq_table_map[table.table_id] = dataset_id_format.format(
dataset.dataset_id, table.table_id)

return bq_table_map

@scrapi_base.api_call_counter_decorator
def _list_agents_client_request(self, location_id) -> List[
Expand Down Expand Up @@ -642,65 +700,85 @@ def get_bq_settings(
if not agent_id:
agent_id = self.agent_id

client_options = self._set_region(agent_id)
headers = self._set_request_headers(client_options)
agent = self.get_agent(agent_id)

response = requests.get(
f"https://{client_options['api_endpoint']}/v3beta1/{agent_id}",
headers=headers
)

data = self._handle_requests_response(response)
if not data:
return None

return {
'advancedSettings': {
'loggingSettings': data['advancedSettings']['loggingSettings']
"advanced_settings": {
"logging_settings": {
"enable_stackdriver_logging": agent.advanced_settings.logging_settings.enable_stackdriver_logging,
"enable_interaction_logging": agent.advanced_settings.logging_settings.enable_interaction_logging
}
},
'bigqueryExportSettings': data['bigqueryExportSettings']
"bigquery_export_settings": {
"enabled": agent.bigquery_export_settings.enabled,
"bigquery_table": agent.bigquery_export_settings.bigquery_table
}
}

@scrapi_base.api_call_counter_decorator
def update_bq_settings(
self, agent_id: str = None, bq_settings: dict = None) -> str:
self,
agent_id: str = None,
bq_settings: Dict[str, str] = None,
enable_stackdriver_logging: bool = None,
enable_interaction_logging: bool = None,
bigquery_enabled: bool = None,
bigquery_table: str = None,
**kwargs
) -> types.Agent:
"""Update BigQuery Interaction Logging settings for an agent.
Args:
agent_id: CX Agent ID string in the following format
projects/<PROJECT ID>/locations/<LOCATION ID>/agents/<AGENT ID>
bq_settings: The BQ interaction logging settings for the agent.
bq_settings.advancedSettings.loggingSettings.enableStackdriverLogging:
The flag to enable cloud logging.
bq_settings.advancedSettings.loggingSettings.enableInteractionLogging:
The flag to enable conversation history logging.
bq_settings.bigqueryExportSettings.enabled: The flag to enable
BigQuery interaction logging exports.
bq_settings.bigqueryExportSettings.bigqueryTable: The BigQuery
interaction logging table path in the form of:
`projects/<GCP PROJECT ID>/datasets/<DATASET ID>/tables/<TABLE ID>`
bq_settings: (Optional) The BQ interaction logging settings for the
agent. Use `get_bq_settings` to get the proper input format for
this option.
enable_stackdriver_logging: (Optional) Boolean indicating whether
Stackdriver logging is enabled.
enable_interaction_logging: (Optional) Boolean indicating whether
interaction logging is enabled.
biqquery_enabled: (Optional) Boolean indicating whether BigQuery
export is enabled.
bigquery_table: (Optional) The name of the BigQuery table to export
interaction logs to. Provide only the table name, the full path
will be built automatically.
Returns:
String "Agent '(agent_id)' successfully updated."
Agent object with updated settings.
"""
BIGQUERY_INPUTS = [
enable_stackdriver_logging,
enable_interaction_logging,
bigquery_enabled,
bigquery_table
]

if not agent_id:
agent_id = self.agent_id
if not bq_settings:
raise ValueError('bq_settings must be specified')

client_options = self._set_region(agent_id)
headers = self._set_request_headers(client_options)
update_mask = 'advancedSettings.loggingSettings,bigqueryExportSettings'

response = requests.patch(
f"https://{client_options['api_endpoint']}/v3beta1/{agent_id}?update_mask={update_mask}",
headers=headers,
json=bq_settings
)

data = self._handle_requests_response(response)
if not data:
return None

if not bq_settings and not any(BIGQUERY_INPUTS):
raise ValueError(
"At least one setting must be provided if 'bq_settings' is"\
" not used.")

# If the user doesn't provide the full bq_settings payload, we'll get
# the existing payload
elif not bq_settings:
bq_settings = self.get_bq_settings(agent_id)
bq_settings = self.modify_bq_settings_kwargs(
agent_id=agent_id,
bq_settings=bq_settings,
enable_stackdriver_logging=enable_stackdriver_logging,
enable_interaction_logging=enable_interaction_logging,
bigquery_enabled=bigquery_enabled,
bigquery_table=bigquery_table
)

agent = self.update_agent(
agent_id,
advanced_settings = bq_settings["advanced_settings"],
bigquery_export_settings = bq_settings["bigquery_export_settings"]
)

return f"Agent '{agent_id}' successfully updated."

return agent

0 comments on commit 1193056

Please sign in to comment.