Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

/bin/bash: bad substitution #3044

Open
Johnbathappully opened this issue Mar 10, 2024 · 0 comments
Open

/bin/bash: bad substitution #3044

Johnbathappully opened this issue Mar 10, 2024 · 0 comments
Labels

Comments

@Johnbathappully
Copy link

Johnbathappully commented Mar 10, 2024

Operating System

Windows

im getting /bin/bash: ${inputs.input_container_name}: bad substitution

### Steps to reproduce

#this is custom environment which will run and execute code if i run as a job
from azure.ai.ml.entities import Environment
import os

custom_env_name = "feat_env_city"
dependencies_dir = os.getcwd()

pipeline_job_env1 = Environment(
name=custom_env_name,
description="Custom environment for feature engineering",
tags={
"python": "3.7",
"pandas": "latest",
"scikit-learn": "latest",
"azure-storage-blob": ">=12.10.0",
"azure-ai-ml": "latest"
},
conda_file=os.path.join(dependencies_dir, "environment.yml"),
image="mcr.microsoft.com/azureml/openmpi3.1.2-ubuntu18.04:20210615.v1",
version="1",
)

pipeline_job_env1 = ml_client.environments.create_or_update(pipeline_job_env1)

print(f"Environment with name {pipeline_job_env1.name} is registered to workspace, the environment version is {pipeline_job_env1.version}")

##this is environment.yml file mentioning packages.there is no issue with conflicts

name: custom-environment-feature-city
dependencies:

  • python=3.7
  • pip:
    • azure-storage-blob>=12.10.0
    • azure-ai-ml
    • pandas
    • scikit-learn
      channels:
  • anaconda
  • conda-forge

#this is to get correct code directory

import os

code_dir = "."

#this is component registration

from azure.ai.ml import command
from azure.ai.ml import Input, Output
import os

data_prep_component = command(
name="data_preprocessing_component_city",
display_name="Data Preprocessing Component_city",
description="Performs data preprocessing.",
inputs={
"input_container_name": Input(type="string", description="Name of the input container in Blob Storage."),
},
outputs={
"processed_data": Output(type="uri_folder", description="Local output directory for processed data."),
},
code=code_dir,
command=f"""python3 data_prep_azureml.py
--input-container-name ${{inputs.input_container_name}}
--output-dir ${{outputs.processed_data}}""",
environment=f"{pipeline_job_env1.name}:{pipeline_job_env1.version}",
)

data_prep_component = ml_client.create_or_update(data_prep_component.component)

print(
f"Component {data_prep_component.name} with Version {data_prep_component.version} is registered in the workspace."
)

#this is pipeline registration

from azure.ai.ml import MLClient, dsl
from azure.ai.ml.entities import PipelineJob
from azure.identity import DefaultAzureCredential

@dsl.pipeline(
compute="TestGpuNer",
description="A pipeline that processes data and then trains a model.",

)
def my_training_pipeline(input_container_name):

preprocess_job = data_prep_component(
    input_container_name=input_container_name
)

train_job = train_component(
    training_data=preprocess_job.outputs.processed_data
)

return {"processed_data": preprocess_job.outputs.processed_data}

pipeline = my_training_pipeline(input_container_name="input")
pipeline_job = ml_client.jobs.create_or_update(pipeline,
experiment_name="citydirectorytest",)

print(f"Pipeline submitted. View it at: {pipeline_job.studio_url}")

#####please not im not sharing training environment nor training component...

#for your reference this is data_prep_azureml.py file i use for component registation..the code runs perfectly through terminal as well as a job

import os
import pandas as pd
import re
import ast
import io
import argparse
from azure.storage.blob import BlobServiceClient, ContainerClient
from azure.storage.blob import ContentSettings
from azure.identity import DefaultAzureCredential
from azure.ai.ml import MLClient
from azure.ai.ml.entities import Data
from azure.ai.ml.constants import AssetTypes

def get_args():
parser = argparse.ArgumentParser()
parser.add_argument("--input-container-name", type=str, required=True, help="Input container name")
parser.add_argument("--output-dir", type=str, required=True, help="Local output directory for processed data")
return parser.parse_args()

def advanced_tokenize(text):
"""Tokenize text, including punctuation."""
return re.findall(r'\b\w+\b|[^\s\w]', text)

def basic_tokenize(text):
"""Tokenize text into lowercase words, ignoring punctuation."""
return re.findall(r'\b\w+\b', text.lower())

def string_to_list(string):

if isinstance(string, list):
    return string

if pd.isna(string):
    return []

try:
    return ast.literal_eval(string)
except (ValueError, SyntaxError):
    print(f"Failed to convert string to list: {string}")
    return []

def extract_second_element(conversion_list):
"""Extracts the second element from a list of tuples."""
return [element[1] for element in conversion_list]

def convert_to_numerical(analysis):
"""Converts category tags in the analysis to numerical tags."""
category_to_number = {
'sequence number': 0,
'person name': 1,
'spouse name': 2,
'occupation': 3,
'employment place': 4,
'house number': 5,
'street': 6,
'None': 7
}
numerical_analysis = [(word, category_to_number[cat]) for word, cat in analysis]
print(f"Numerical Conversion Result: {numerical_analysis}")
return numerical_analysis

def analyze_row_with_unique_context(row):
input_text = row['value']
categories = {
'sequence number': row.get('sequence number', []),
'person name': row.get('person name', []),
'spouse name': row.get('spouse name', []),
'occupation': row.get('occupation', []),
'employment place': row.get('employment place', []),
'house number': row.get('house number', []),
'street': row.get('street', [])
}
print(f"\nInput Text: {input_text}")
print("Categories before processing:", categories)

analysis = []
for word in input_text:
    found_category = 'None'
    for category, cat_list in categories.items():
        if word.lower() in [x.lower() for x in cat_list]:
            found_category = category
            break
    analysis.append((word, found_category))

print("Analysis Result:", analysis)  
return analysis

def register_dataset(ml_client, file_path, dataset_name="processed_dataset"):
data_asset = Data(
path=file_path,
type="uri_file",
description="Processed combined CSV dataset",
name=dataset_name,
)

registered_dataset = ml_client.data.create_or_update(data_asset)

print(f"Dataset '{dataset_name}' registered successfully.")
return registered_dataset

def process_and_combine_csv_files_from_blob(input_container_client, output_dir):
blobs_list = input_container_client.list_blobs()
combined_df = pd.DataFrame()

for blob in blobs_list:
    if blob.name.endswith('.csv'):
        blob_client = input_container_client.get_blob_client(blob=blob.name)
        blob_data = blob_client.download_blob().readall()
        blob_str = io.BytesIO(blob_data)
        df = pd.read_csv(blob_str)

        df['value'] = df['value'].apply(lambda x: advanced_tokenize(str(x)) if pd.notna(x) else x)
        for col in ['sequence number', 'person name', 'spouse name', 'occupation', 'employment place', 'house number', 'street']:
            df[col] = df[col].apply(lambda x: basic_tokenize(str(x)) if pd.notna(x) else x)
            df[col] = df[col].apply(string_to_list)
        df['unique_context_analysis'] = df.apply(analyze_row_with_unique_context, axis=1)
        df['conversion'] = df['unique_context_analysis'].apply(convert_to_numerical)
        df['final'] = df['conversion'].apply(extract_second_element)

        df = df[['value', 'final']]
        df.rename(columns={'value': 'tokens', 'final': 'ner_tags'}, inplace=True)
        df.insert(0, 'id', range(1, 1 + len(df)))

        combined_df = pd.concat([combined_df, df], ignore_index=True)

output_file_path = os.path.join(output_dir, "processed_combined_csv.csv")
combined_df.to_csv(output_file_path, index=False)
print(f"\nProcessed and combined CSV file saved locally: {output_file_path}")
return output_file_path

if name == "main":
args = get_args()
connection_string = ""

blob_service_client = BlobServiceClient.from_connection_string(connection_string)
input_container_client = blob_service_client.get_container_client(args.input_container_name)

os.makedirs(args.output_dir, exist_ok=True)

output_file_path =process_and_combine_csv_files_from_blob(input_container_client, args.output_dir)

credential = DefaultAzureCredential()
ml_client = MLClient(
    credential=credential,
    subscription_id="", 
    resource_group_name="ai_ml",
    workspace_name="AzureML_flow"
)

register_dataset(ml_client, output_file_path)   

#pls note ..i have removed ml_client and connection_string keys

image
image
image

Actual behavior

this is an additional issue which i resolved

please note a job is taking 1.5 hrs to schedule ..also command wise i able to register the component ..whereas if i try via yaml configuration then showing

/mnt/azureml/cr/j/46aec26067694da8a26ce7eb7e8b8875/exe/wd
azureml-logs
logs
outputs
user_logs
python: can't open file '/mnt/batch/tasks/shared/LS_root/mounts/clusters/anchestrycpucompute1/code/Users/john.athappully/data_prep_azureml.py': [Errno 2] No such file or directory

also i tried runnig your github code smple nd found same issue using yaml for training code as you use for data prepration the command wise code...i had to change to command wise for training component to make it working..

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant