Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to update values from PCollection. #1

Open
SnoozingSimian opened this issue Mar 17, 2023 · 12 comments
Open

How to update values from PCollection. #1

SnoozingSimian opened this issue Mar 17, 2023 · 12 comments
Assignees
Labels
question Further information is requested

Comments

@SnoozingSimian
Copy link

SnoozingSimian commented Mar 17, 2023

Hey I have been trying to use your library (which is very useful by the way) to try to Update a few records in by database. I am providing a snippet of code here which should provide some insight on what I am trying to achieve.

import apache_beam as beam
from beam_postgres.io import WriteToPostgres
from dataclasses import dataclass

DB_USERNAME = <db user name>
DB_PASSWORD = <db pass>
DB_HOST = <db host>
DB_PORT = <db port>
DB_DATABASENAME = <db name>

@dataclass
class DocId:
    document_id: str

class ProcessVals(beam.DoFn):
    def process(self, value):
        if value is None:
            yield DocId(None)
        else:
            yield DocId(value)

def check_if_none(value):
    if value.document_id is None:
        return False
    else :
        return True
    
with beam.Pipeline() as p:
    data = p | "Creating" >> beam.Create(
        ['02267a6d-0a9f-40bf-9051-4971961cb0ac', 
         '05db919e-adda-41c2-9197-54623dff6d1a', 
         '04c607ec-64e8-420e-b5a5-bb63e035ba2f', 
         None, 
         None]
       )
    
    data2 = data | "Molding" >> beam.ParDo(ProcessVals()) | "Filter" >> beam.Filter(check_if_none)
    
    data2 | "Writing example records to database" >> WriteToPostgres(
        conninfo = f"host={DB_HOST} dbname={DB_DATABASENAME} user={DB_USERNAME} password={DB_PASSWORD}",
        statement ="UPDATE documents SET is_available = true WHERE document_id = %s",
        )

Unfortunately this does not seem to be working, the pipeline runs fine, but I do not see the changes reflected in my database. I am really new to the apcache-beam space and I am having troubleshooting, could you help?

@medzin medzin self-assigned this Mar 17, 2023
@medzin medzin added the question Further information is requested label Mar 17, 2023
@medzin
Copy link
Owner

medzin commented Mar 17, 2023

Hi, thanks for using the library! I would start by adding error logging, just like in this example: https://github.com/medzin/beam-postgres/blob/main/examples/write_error.py#L31

Also, it looks like there is a typo in the process method of the ProcessVals class:

if value is not None:
    yield DocId(None)

This DoFn will always create DocId objects with document_id set to None, so filtering will always return an empty collection.

@SnoozingSimian
Copy link
Author

SnoozingSimian commented Mar 18, 2023

Thank you for your prompt reply. Yes, that seems to have been the case. As you might guess this is just an imitation of the actual code I am trying to run. Let me recheck if I have made the same mistake in my main code.

I tried to log the error and here is the error I got.

(DocId(document_id='0230d273-ca7f-42d4-bf2d-5552d8ec9efa'), ProgrammingError('the query has 1 placeholders but 0 parameters were passed'))

It seems like I am passing the correct object and it still looks like it does not recognise the parameter.
For reference, the query I am using is this.

UPDATE documents SET is_available = true WHERE document_id = %s

@medzin
Copy link
Owner

medzin commented Mar 18, 2023

The error message suggests that document_id is set to None. You could try printing data classes before writing with beam.Map(print).

@SnoozingSimian
Copy link
Author

But the error message shows the dataclass value right? The document id is clearly set to a valid value.

@medzin
Copy link
Owner

medzin commented Mar 18, 2023

What runner are you using to run the pipeline (e.g. DataflowRunner)?

@SnoozingSimian
Copy link
Author

Right now I'm testing using the direct runner but eventually I'll be using the dataflow runner

@medzin
Copy link
Owner

medzin commented Mar 18, 2023

I experienced a few problems with data class serialization on Dataflow - fields appeared to be set. Still, some class metadata was lost after deserialization, and function call here is returning an empty tuple. They only work without problems when I put my models in a dedicated Python package outside the pipeline code. I use apache-beam[gcp]==2.42.0 and Dataflow Flex Templates.

@SnoozingSimian
Copy link
Author

The issue is I am not yet even using DataFlow, just the plain old direct runner. I see that the code example above actually updates the required documents. And in my main code, I am actually using the exact same environment apache-beam==2.46.0 and python versions (3.10.8). I am also using the same dataclass and query. So I am not exactly sure why these are behaving differently.

@medzin
Copy link
Owner

medzin commented Mar 19, 2023

Well, I'm using Python 3.9.16, which could be the difference causing the problem. Still, I suggest you put the data class in a dedicated Python package and check if it removes the problematic behavior.

@SnoozingSimian
Copy link
Author

Okay, let me try that. I'll report the results.

@SnoozingSimian
Copy link
Author

Hey so what I did was use tuples instead of Dataclasses. It seems to work now although I am not entirely sure what was causing the line you mentioned to fail.

@medzin
Copy link
Owner

medzin commented Mar 21, 2023

The problem is that the astuple returns an empty tuple for data classes defined in the main session. It looks like that after the serialization/deserialization process __dataclass_fields__ attribute is corrupted somehow (it is used inside the astuple implementation).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants