Skip to content

Commit

Permalink
materialize-boilerplate: produce unsatisfiable constraints for incomp…
Browse files Browse the repository at this point in the history
…atible type changes of ambiguous files

An "ambiguous" field is one where the field name differs only in capitalization,
and the destination system does not differentiate fields based on
capitalization.

Previously we were not correctly evaluating incompatible type changes for this
fields if they were previously in the field selection. This is the fix for that.
  • Loading branch information
williamhbaker committed Nov 15, 2024
1 parent 883a454 commit 8506acb
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 38 deletions.
2 changes: 1 addition & 1 deletion materialize-boilerplate/.snapshots/TestValidate
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ update an existing materialization with ambiguous fields in an incompatbile way:
{"Field":"SECONDBADFIELD","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Flow collection field 'SECONDBADFIELD' would be materialized as 'secondbadfield', which is ambiguous with fields [secondBadField,secondbadfield]. Only a single field from this set should be selected. Consider using alternate projections if you want to materialize more than one of these fields"}
{"Field":"THIRDBADFIELD","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Flow collection field 'THIRDBADFIELD' is ambiguous with fields already being materialized as 'thirdbadfield' in the destination. Consider using an alternate, unambiguous projection of this field to allow it to be materialized"}
{"Field":"_meta/flow_truncated","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"}
{"Field":"firstBadField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"firstBadField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'firstBadField' is already being materialized as endpoint type 'STRING' but endpoint type 'INTEGER' is required by its schema '{ type: [integer] }'"}
{"Field":"firstbadfield","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Flow collection field 'firstbadfield' is ambiguous with fields already being materialized as 'firstbadfield' in the destination. Consider using an alternate, unambiguous projection of this field to allow it to be materialized"}
{"Field":"flow_document","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is the document in the current materialization"}
{"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"}
Expand Down
97 changes: 60 additions & 37 deletions materialize-boilerplate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,17 @@ func (v Validator) validateMatchesExistingBinding(
// yet been selected.
if existing != nil && slices.Contains(existing.FieldSelection.AllFields(), p.Field) {
// This field has already been selected as the ambiguous field to materialize.
c = &pm.Response_Validated_Constraint{
Type: pm.Response_Validated_Constraint_LOCATION_RECOMMENDED,
Reason: "This location is part of the current materialization",
if existingField, err := v.is.GetField(path, p.Field); err == nil {
// If the field already exists in the destination, make sure
// the new projection is compatible.
if c, err = v.constraintForExistingField(boundCollection, p, existingField, fieldConfigJsonMap); err != nil {
return nil, fmt.Errorf("evaluating constraint for existing ambiguous field: %w", err)
}
} else {
c = &pm.Response_Validated_Constraint{
Type: pm.Response_Validated_Constraint_LOCATION_RECOMMENDED,
Reason: "This location is part of the current materialization",
}
}
} else if existing != nil && v.ambiguousFieldIsSelected(p, existing.FieldSelection.AllFields()) {
// A different field has been selected, so this one can't be.
Expand Down Expand Up @@ -251,40 +259,8 @@ func (v Validator) validateMatchesExistingBinding(
} else if existingField, err := v.is.GetField(path, p.Field); err == nil {
// All other fields that are already being materialized. Any error from GetField is
// because the field does not already exist.
rawConfig := fieldConfigJsonMap[p.Field]
if compatible, err := v.c.Compatible(existingField, &p, rawConfig); err != nil {
return nil, fmt.Errorf("determining compatibility for endpoint field %q vs. selected field %q: %w", existingField.Name, p.Field, err)
} else if compatible {
if p.IsPrimaryKey {
c = &pm.Response_Validated_Constraint{
Type: pm.Response_Validated_Constraint_FIELD_REQUIRED,
Reason: "This field is a key in the current materialization",
}
} else {
// TODO(whb): Really this should be "FIELD_RECOMMENDED", but that is not a
// constraint that has been implemented currently. This would be an issue if there
// are multiple projections of the same location.
c = &pm.Response_Validated_Constraint{
Type: pm.Response_Validated_Constraint_LOCATION_RECOMMENDED,
Reason: "This location is part of the current materialization",
}
}
} else {
newDesc, err := v.c.DescriptionForType(&p, rawConfig)
if err != nil {
return nil, fmt.Errorf("getting description for field %q of bound collection %q: %w", p.Field, boundCollection.Name.String(), err)
}

c = &pm.Response_Validated_Constraint{
Type: pm.Response_Validated_Constraint_UNSATISFIABLE,
Reason: fmt.Sprintf(
"Field '%s' is already being materialized as endpoint type '%s' but endpoint type '%s' is required by its schema '%s'",
p.Field,
strings.ToUpper(existingField.Type),
strings.ToUpper(newDesc),
fieldSchema(p),
),
}
if c, err = v.constraintForExistingField(boundCollection, p, existingField, fieldConfigJsonMap); err != nil {
return nil, err
}
}

Expand Down Expand Up @@ -321,6 +297,53 @@ func (v Validator) validateMatchesExistingBinding(
return constraints, nil
}

func (v Validator) constraintForExistingField(
boundCollection pf.CollectionSpec,
p pf.Projection,
existingField EndpointField,
fieldConfigJsonMap map[string]json.RawMessage,
) (*pm.Response_Validated_Constraint, error) {
var out *pm.Response_Validated_Constraint

rawConfig := fieldConfigJsonMap[p.Field]
if compatible, err := v.c.Compatible(existingField, &p, rawConfig); err != nil {
return nil, fmt.Errorf("determining compatibility for endpoint field %q vs. selected field %q: %w", existingField.Name, p.Field, err)
} else if compatible {
if p.IsPrimaryKey {
out = &pm.Response_Validated_Constraint{
Type: pm.Response_Validated_Constraint_FIELD_REQUIRED,
Reason: "This field is a key in the current materialization",
}
} else {
// TODO(whb): Really this should be "FIELD_RECOMMENDED", but that is not a
// constraint that has been implemented currently. This would be an issue if there
// are multiple projections of the same location.
out = &pm.Response_Validated_Constraint{
Type: pm.Response_Validated_Constraint_LOCATION_RECOMMENDED,
Reason: "This location is part of the current materialization",
}
}
} else {
newDesc, err := v.c.DescriptionForType(&p, rawConfig)
if err != nil {
return nil, fmt.Errorf("getting description for field %q of bound collection %q: %w", p.Field, boundCollection.Name.String(), err)
}

out = &pm.Response_Validated_Constraint{
Type: pm.Response_Validated_Constraint_UNSATISFIABLE,
Reason: fmt.Sprintf(
"Field '%s' is already being materialized as endpoint type '%s' but endpoint type '%s' is required by its schema '%s'",
p.Field,
strings.ToUpper(existingField.Type),
strings.ToUpper(newDesc),
fieldSchema(p),
),
}
}

return out, nil
}

// ambiguousFields determines if the given projection is part of a set of projections that would
// result in ambiguous field names in the destination system. Fields are "ambiguous" if their
// destination treats more than one Flow collection field name as the same materialized field name.
Expand Down

0 comments on commit 8506acb

Please sign in to comment.