diff --git a/crates/validation/tests/scenario_tests.rs b/crates/validation/tests/scenario_tests.rs index 5836fd7d6a..c9451edb0d 100644 --- a/crates/validation/tests/scenario_tests.rs +++ b/crates/validation/tests/scenario_tests.rs @@ -12,22 +12,20 @@ fn test_golden_all_visits() { fn test_dekaf_materialization_inline_config() { let fixture = r##" test://example/catalog.yaml: - collections: - testing/schema_with_properties: - schema: - type: object - properties: - id: { type: string } - required: [id] - key: [/id] materializations: - testing/test_dekaf: + good: endpoint: - dekaf: {} - bindings: - - source: testing/schema_with_properties - resource: {} + dekaf: &config + variant: foo + config: + strict_topic_names: false + bindings: [] driver: + materializations: + good: &connector + connectorType: DEKAF + config: *config + bindings: [] dataPlanes: "1d:1d:1d:1d:1d:1d:1d:1d": default: true @@ -41,24 +39,22 @@ driver: #[test] fn test_dekaf_materialization_indirect_config() { let fixture = r##" -test://example/dekaf.yaml: {} test://example/catalog.yaml: - collections: - testing/schema_with_properties: - schema: - type: object - properties: - id: { type: string } - required: [id] - key: [/id] materializations: - testing/test_dekaf: + good: endpoint: - dekaf: example/dekaf.yaml - bindings: - - source: testing/schema_with_properties - resource: {} + dekaf: &config + variant: foo + config: test://example/dekaf + bindings: [] +test://example/dekaf: + strict_topic_names: false driver: + materializations: + good: &connector + connectorType: DEKAF + config: *config + bindings: [] dataPlanes: "1d:1d:1d:1d:1d:1d:1d:1d": default: true @@ -72,7 +68,6 @@ driver: #[test] fn test_dekaf_materialization_invalid() { let fixture = r##" -test://example/dekaf.yaml: {} test://example/catalog.yaml: collections: testing/schema_with_properties: @@ -83,13 +78,16 @@ test://example/catalog.yaml: required: [id] key: [/id] materializations: - testing/test_dekaf: - endpoint: + bad: + endpoint: &config dekaf: false - bindings: - - source: testing/schema_with_properties - resource: {} + bindings: [] driver: + materializations: + bad: &connector + connectorType: DEKAF + config: *config + bindings: [] dataPlanes: "1d:1d:1d:1d:1d:1d:1d:1d": default: true @@ -103,24 +101,20 @@ driver: #[test] fn test_dekaf_materialization_nonexistent() { let fixture = r##" -test://example/dekaf.yaml: {} test://example/catalog.yaml: - collections: - testing/schema_with_properties: - schema: - type: object - properties: - id: { type: string } - required: [id] - key: [/id] materializations: - testing/test_dekaf: + bad: endpoint: - dekaf: foo/bar - bindings: - - source: testing/schema_with_properties - resource: {} + dekaf: &config + variant: bar + config: foo/bar + bindings: [] driver: + materializations: + good: &connector + connectorType: DEKAF + config: *config + bindings: [] dataPlanes: "1d:1d:1d:1d:1d:1d:1d:1d": default: true diff --git a/crates/validation/tests/snapshots/scenario_tests__dekaf_materialization_indirect_config.snap b/crates/validation/tests/snapshots/scenario_tests__dekaf_materialization_indirect_config.snap index 961ba8d6fa..e6303503d5 100644 --- a/crates/validation/tests/snapshots/scenario_tests__dekaf_materialization_indirect_config.snap +++ b/crates/validation/tests/snapshots/scenario_tests__dekaf_materialization_indirect_config.snap @@ -1,461 +1,39 @@ --- source: crates/validation/tests/scenario_tests.rs -assertion_line: 69 +assertion_line: 65 expression: outcome --- Outcome { built_captures: [], - built_collections: [ - BuiltCollection { - collection: testing/schema_with_properties, - scope: test://example/catalog.yaml#/collections/testing~1schema_with_properties, - control_id: "0000000000000000", - data_plane_id: "1d1d1d1d1d1d1d1d", - expect_pub_id: "0000000000000000", - expect_build_id: "0000000000000000", - model: { - "schema": {"$id":"test://example/catalog.yaml?ptr=/collections/testing~1schema_with_properties/schema","properties":{"id":{"type":"string"}},"required":["id"],"type":"object"}, - "key": [ - "/id" - ] - }, - validated: NULL, - spec: CollectionSpec { - name: "testing/schema_with_properties", - write_schema_json: "{\"$id\":\"test://example/catalog.yaml?ptr=/collections/testing~1schema_with_properties/schema\",\"properties\":{\"id\":{\"type\":\"string\"}},\"required\":[\"id\"],\"type\":\"object\"}", - read_schema_json: "", - key: [ - "/id", - ], - uuid_ptr: "/_meta/uuid", - partition_fields: [], - projections: [ - Projection { - ptr: "/_meta/flow_truncated", - field: "_meta/flow_truncated", - explicit: false, - is_partition_key: false, - is_primary_key: false, - inference: Some( - Inference { - types: [ - "boolean", - ], - string: None, - title: "Flow truncation indicator", - description: "Indicates whether any of the materialized values for this row have been truncated to make them fit inside the limitations of the destination system.", - default_json: "", - secret: false, - exists: Must, - numeric: None, - }, - ), - }, - Projection { - ptr: "", - field: "flow_document", - explicit: false, - is_partition_key: false, - is_primary_key: false, - inference: Some( - Inference { - types: [ - "object", - ], - string: None, - title: "", - description: "", - default_json: "", - secret: false, - exists: Must, - numeric: None, - }, - ), - }, - Projection { - ptr: "/_meta/uuid", - field: "flow_published_at", - explicit: false, - is_partition_key: false, - is_primary_key: false, - inference: Some( - Inference { - types: [ - "string", - ], - string: Some( - String { - content_type: "", - format: "date-time", - content_encoding: "uuid", - max_length: 0, - }, - ), - title: "Flow Publication Time", - description: "Flow publication date-time of this document", - default_json: "", - secret: false, - exists: Must, - numeric: None, - }, - ), - }, - Projection { - ptr: "/id", - field: "id", - explicit: false, - is_partition_key: false, - is_primary_key: true, - inference: Some( - Inference { - types: [ - "string", - ], - string: Some( - String { - content_type: "", - format: "", - content_encoding: "", - max_length: 0, - }, - ), - title: "", - description: "", - default_json: "", - secret: false, - exists: Must, - numeric: None, - }, - ), - }, - ], - ack_template_json: "{\"_meta\":{\"ack\":true,\"uuid\":\"DocUUIDPlaceholder-329Bb50aa48EAa9ef\"}}", - partition_template: Some( - JournalSpec { - name: "testing/schema_with_properties/2020202020202020", - replication: 3, - labels: Some( - LabelSet { - labels: [ - Label { - name: "app.gazette.dev/managed-by", - value: "estuary.dev/flow", - prefix: false, - }, - Label { - name: "content-type", - value: "application/x-ndjson", - prefix: false, - }, - Label { - name: "estuary.dev/build", - value: "2121212121212121", - prefix: false, - }, - Label { - name: "estuary.dev/collection", - value: "testing/schema_with_properties", - prefix: false, - }, - ], - }, - ), - fragment: Some( - Fragment { - length: 536870912, - compression_codec: Gzip, - stores: [ - "s3://a-bucket/", - ], - refresh_interval: Some( - Duration { - seconds: 300, - nanos: 0, - }, - ), - retention: None, - flush_interval: None, - path_postfix_template: "utc_date={{.Spool.FirstAppendTime.Format \"2006-01-02\"}}/utc_hour={{.Spool.FirstAppendTime.Format \"15\"}}", - }, - ), - flags: 4, - max_append_rate: 4194304, - }, - ), - derivation: None, - }, - previous_spec: NULL, - is_touch: 0, - dependency_hash: NULL, - }, - ], + built_collections: [], built_materializations: [ BuiltMaterialization { - materialization: testing/test_dekaf, - scope: test://example/catalog.yaml#/materializations/testing~1test_dekaf, + materialization: good, + scope: test://example/catalog.yaml#/materializations/good, control_id: "0000000000000000", data_plane_id: "1d1d1d1d1d1d1d1d", expect_pub_id: "0000000000000000", expect_build_id: "0000000000000000", model: { "endpoint": { - "dekaf": "example/dekaf.yaml" - }, - "bindings": [ - { - "resource": {}, - "source": "testing/schema_with_properties", - "fields": { - "recommended": true - } + "dekaf": { + "variant": "foo", + "config": "test://example/dekaf" } - ] + }, + "bindings": [] }, validated: Validated { - bindings: [ - Binding { - constraints: { - "_meta/flow_truncated": Constraint { - r#type: FieldOptional, - reason: "no-op validator allows everything", - }, - "flow_document": Constraint { - r#type: FieldOptional, - reason: "no-op validator allows everything", - }, - "flow_published_at": Constraint { - r#type: FieldOptional, - reason: "no-op validator allows everything", - }, - "id": Constraint { - r#type: FieldOptional, - reason: "no-op validator allows everything", - }, - }, - resource_path: [ - "binding-0", - ], - delta_updates: true, - }, - ], + bindings: [], }, spec: MaterializationSpec { - name: "testing/test_dekaf", + name: "good", connector_type: Dekaf, - config_json: "\"example/dekaf.yaml\"", - bindings: [ - Binding { - resource_config_json: "{}", - resource_path: [ - "binding-0", - ], - collection: Some( - CollectionSpec { - name: "testing/schema_with_properties", - write_schema_json: "{\"$id\":\"test://example/catalog.yaml?ptr=/collections/testing~1schema_with_properties/schema\",\"properties\":{\"id\":{\"type\":\"string\"}},\"required\":[\"id\"],\"type\":\"object\"}", - read_schema_json: "", - key: [ - "/id", - ], - uuid_ptr: "/_meta/uuid", - partition_fields: [], - projections: [ - Projection { - ptr: "/_meta/flow_truncated", - field: "_meta/flow_truncated", - explicit: false, - is_partition_key: false, - is_primary_key: false, - inference: Some( - Inference { - types: [ - "boolean", - ], - string: None, - title: "Flow truncation indicator", - description: "Indicates whether any of the materialized values for this row have been truncated to make them fit inside the limitations of the destination system.", - default_json: "", - secret: false, - exists: Must, - numeric: None, - }, - ), - }, - Projection { - ptr: "", - field: "flow_document", - explicit: false, - is_partition_key: false, - is_primary_key: false, - inference: Some( - Inference { - types: [ - "object", - ], - string: None, - title: "", - description: "", - default_json: "", - secret: false, - exists: Must, - numeric: None, - }, - ), - }, - Projection { - ptr: "/_meta/uuid", - field: "flow_published_at", - explicit: false, - is_partition_key: false, - is_primary_key: false, - inference: Some( - Inference { - types: [ - "string", - ], - string: Some( - String { - content_type: "", - format: "date-time", - content_encoding: "uuid", - max_length: 0, - }, - ), - title: "Flow Publication Time", - description: "Flow publication date-time of this document", - default_json: "", - secret: false, - exists: Must, - numeric: None, - }, - ), - }, - Projection { - ptr: "/id", - field: "id", - explicit: false, - is_partition_key: false, - is_primary_key: true, - inference: Some( - Inference { - types: [ - "string", - ], - string: Some( - String { - content_type: "", - format: "", - content_encoding: "", - max_length: 0, - }, - ), - title: "", - description: "", - default_json: "", - secret: false, - exists: Must, - numeric: None, - }, - ), - }, - ], - ack_template_json: "{\"_meta\":{\"ack\":true,\"uuid\":\"DocUUIDPlaceholder-329Bb50aa48EAa9ef\"}}", - partition_template: Some( - JournalSpec { - name: "testing/schema_with_properties/2020202020202020", - replication: 3, - labels: Some( - LabelSet { - labels: [ - Label { - name: "app.gazette.dev/managed-by", - value: "estuary.dev/flow", - prefix: false, - }, - Label { - name: "content-type", - value: "application/x-ndjson", - prefix: false, - }, - Label { - name: "estuary.dev/build", - value: "2121212121212121", - prefix: false, - }, - Label { - name: "estuary.dev/collection", - value: "testing/schema_with_properties", - prefix: false, - }, - ], - }, - ), - fragment: Some( - Fragment { - length: 536870912, - compression_codec: Gzip, - stores: [ - "s3://a-bucket/", - ], - refresh_interval: Some( - Duration { - seconds: 300, - nanos: 0, - }, - ), - retention: None, - flush_interval: None, - path_postfix_template: "utc_date={{.Spool.FirstAppendTime.Format \"2006-01-02\"}}/utc_hour={{.Spool.FirstAppendTime.Format \"15\"}}", - }, - ), - flags: 4, - max_append_rate: 4194304, - }, - ), - derivation: None, - }, - ), - partition_selector: Some( - LabelSelector { - include: Some( - LabelSet { - labels: [ - Label { - name: "estuary.dev/collection", - value: "testing/schema_with_properties", - prefix: false, - }, - ], - }, - ), - exclude: Some( - LabelSet { - labels: [], - }, - ), - }, - ), - priority: 0, - field_selection: Some( - FieldSelection { - keys: [], - values: [], - document: "", - field_config_json_map: {}, - }, - ), - delta_updates: true, - deprecated_shuffle: None, - journal_read_suffix: "materialize/testing/test_dekaf/binding-0", - not_before: None, - not_after: None, - backfill: 0, - state_key: "binding-0", - }, - ], + config_json: "{\"variant\":\"foo\",\"config\":\"test://example/dekaf\"}", + bindings: [], shard_template: Some( ShardSpec { - id: "materialize/testing/test_dekaf/2020202020202020", + id: "materialize/good/2020202020202020", sources: [], recovery_log_prefix: "recovery", hint_prefix: "/estuary/flow/hints", @@ -494,7 +72,7 @@ Outcome { }, Label { name: "estuary.dev/task-name", - value: "testing/test_dekaf", + value: "good", prefix: false, }, Label { @@ -512,7 +90,7 @@ Outcome { ), recovery_log_template: Some( JournalSpec { - name: "recovery/materialize/testing/test_dekaf/2020202020202020", + name: "recovery/materialize/good/2020202020202020", replication: 3, labels: Some( LabelSet { @@ -534,7 +112,7 @@ Outcome { }, Label { name: "estuary.dev/task-name", - value: "testing/test_dekaf", + value: "good", prefix: false, }, Label { @@ -571,32 +149,14 @@ Outcome { }, previous_spec: NULL, is_touch: 0, - dependency_hash: 32182596aeb1e4a0, + dependency_hash: NULL, }, ], built_tests: [], captures: [], - collections: [ - DraftCollection { - collection: testing/schema_with_properties, - scope: test://example/catalog.yaml#/collections/testing~1schema_with_properties, - expect_pub_id: NULL, - model: { - "schema": {"$id":"test://example/catalog.yaml?ptr=/collections/testing~1schema_with_properties/schema","properties":{"id":{"type":"string"}},"required":["id"],"type":"object"}, - "key": [ - "/id" - ] - }, - is_touch: 0, - }, - ], + collections: [], errors: [], - errors_draft: [ - Error { - scope: test://example/catalog.yaml#/materializations/testing~1test_dekaf/endpoint/dekaf/config, - error: failed to fetch resource test://example/example/dekaf.yaml: fixture not found, - }, - ], + errors_draft: [], fetches: [ Fetch { depth: 1, @@ -604,33 +164,28 @@ Outcome { }, Fetch { depth: 2, - resource: test://example/example/dekaf.yaml, + resource: test://example/dekaf, }, ], imports: [ Import { - scope: test://example/catalog.yaml#/materializations/testing~1test_dekaf/endpoint/dekaf/config, - to_resource: test://example/example/dekaf.yaml, + scope: test://example/catalog.yaml#/materializations/good/endpoint/dekaf/config, + to_resource: test://example/dekaf, }, ], materializations: [ DraftMaterialization { - materialization: testing/test_dekaf, - scope: test://example/catalog.yaml#/materializations/testing~1test_dekaf, + materialization: good, + scope: test://example/catalog.yaml#/materializations/good, expect_pub_id: NULL, model: { "endpoint": { - "dekaf": "example/dekaf.yaml" - }, - "bindings": [ - { - "resource": {}, - "source": "testing/schema_with_properties", - "fields": { - "recommended": true - } + "dekaf": { + "variant": "foo", + "config": "test://example/dekaf" } - ] + }, + "bindings": [] }, is_touch: 0, }, @@ -640,7 +195,13 @@ Outcome { resource: test://example/catalog.yaml, content_type: "CATALOG", content: ".. binary ..", - content_dom: {"collections":{"testing/schema_with_properties":{"key":["/id"],"schema":{"properties":{"id":{"type":"string"}},"required":["id"],"type":"object"}}},"materializations":{"testing/test_dekaf":{"bindings":[{"resource":{},"source":"testing/schema_with_properties"}],"endpoint":{"dekaf":"example/dekaf.yaml"}}}}, + content_dom: {"materializations":{"good":{"bindings":[],"endpoint":{"dekaf":{"config":"test://example/dekaf","variant":"foo"}}}}}, + }, + Resource { + resource: test://example/dekaf, + content_type: "CONFIG", + content: ".. binary ..", + content_dom: "{\"strict_topic_names\":false}", }, ], storage_mappings: [ diff --git a/crates/validation/tests/snapshots/scenario_tests__dekaf_materialization_inline_config.snap b/crates/validation/tests/snapshots/scenario_tests__dekaf_materialization_inline_config.snap index 61615d4d03..6cfa7d5c48 100644 --- a/crates/validation/tests/snapshots/scenario_tests__dekaf_materialization_inline_config.snap +++ b/crates/validation/tests/snapshots/scenario_tests__dekaf_materialization_inline_config.snap @@ -1,461 +1,39 @@ --- source: crates/validation/tests/scenario_tests.rs -assertion_line: 38 +assertion_line: 36 expression: outcome --- Outcome { built_captures: [], - built_collections: [ - BuiltCollection { - collection: testing/schema_with_properties, - scope: test://example/catalog.yaml#/collections/testing~1schema_with_properties, - control_id: "0000000000000000", - data_plane_id: "1d1d1d1d1d1d1d1d", - expect_pub_id: "0000000000000000", - expect_build_id: "0000000000000000", - model: { - "schema": {"$id":"test://example/catalog.yaml?ptr=/collections/testing~1schema_with_properties/schema","properties":{"id":{"type":"string"}},"required":["id"],"type":"object"}, - "key": [ - "/id" - ] - }, - validated: NULL, - spec: CollectionSpec { - name: "testing/schema_with_properties", - write_schema_json: "{\"$id\":\"test://example/catalog.yaml?ptr=/collections/testing~1schema_with_properties/schema\",\"properties\":{\"id\":{\"type\":\"string\"}},\"required\":[\"id\"],\"type\":\"object\"}", - read_schema_json: "", - key: [ - "/id", - ], - uuid_ptr: "/_meta/uuid", - partition_fields: [], - projections: [ - Projection { - ptr: "/_meta/flow_truncated", - field: "_meta/flow_truncated", - explicit: false, - is_partition_key: false, - is_primary_key: false, - inference: Some( - Inference { - types: [ - "boolean", - ], - string: None, - title: "Flow truncation indicator", - description: "Indicates whether any of the materialized values for this row have been truncated to make them fit inside the limitations of the destination system.", - default_json: "", - secret: false, - exists: Must, - numeric: None, - }, - ), - }, - Projection { - ptr: "", - field: "flow_document", - explicit: false, - is_partition_key: false, - is_primary_key: false, - inference: Some( - Inference { - types: [ - "object", - ], - string: None, - title: "", - description: "", - default_json: "", - secret: false, - exists: Must, - numeric: None, - }, - ), - }, - Projection { - ptr: "/_meta/uuid", - field: "flow_published_at", - explicit: false, - is_partition_key: false, - is_primary_key: false, - inference: Some( - Inference { - types: [ - "string", - ], - string: Some( - String { - content_type: "", - format: "date-time", - content_encoding: "uuid", - max_length: 0, - }, - ), - title: "Flow Publication Time", - description: "Flow publication date-time of this document", - default_json: "", - secret: false, - exists: Must, - numeric: None, - }, - ), - }, - Projection { - ptr: "/id", - field: "id", - explicit: false, - is_partition_key: false, - is_primary_key: true, - inference: Some( - Inference { - types: [ - "string", - ], - string: Some( - String { - content_type: "", - format: "", - content_encoding: "", - max_length: 0, - }, - ), - title: "", - description: "", - default_json: "", - secret: false, - exists: Must, - numeric: None, - }, - ), - }, - ], - ack_template_json: "{\"_meta\":{\"ack\":true,\"uuid\":\"DocUUIDPlaceholder-329Bb50aa48EAa9ef\"}}", - partition_template: Some( - JournalSpec { - name: "testing/schema_with_properties/2020202020202020", - replication: 3, - labels: Some( - LabelSet { - labels: [ - Label { - name: "app.gazette.dev/managed-by", - value: "estuary.dev/flow", - prefix: false, - }, - Label { - name: "content-type", - value: "application/x-ndjson", - prefix: false, - }, - Label { - name: "estuary.dev/build", - value: "2121212121212121", - prefix: false, - }, - Label { - name: "estuary.dev/collection", - value: "testing/schema_with_properties", - prefix: false, - }, - ], - }, - ), - fragment: Some( - Fragment { - length: 536870912, - compression_codec: Gzip, - stores: [ - "s3://a-bucket/", - ], - refresh_interval: Some( - Duration { - seconds: 300, - nanos: 0, - }, - ), - retention: None, - flush_interval: None, - path_postfix_template: "utc_date={{.Spool.FirstAppendTime.Format \"2006-01-02\"}}/utc_hour={{.Spool.FirstAppendTime.Format \"15\"}}", - }, - ), - flags: 4, - max_append_rate: 4194304, - }, - ), - derivation: None, - }, - previous_spec: NULL, - is_touch: 0, - dependency_hash: NULL, - }, - ], + built_collections: [], built_materializations: [ BuiltMaterialization { - materialization: testing/test_dekaf, - scope: test://example/catalog.yaml#/materializations/testing~1test_dekaf, + materialization: good, + scope: test://example/catalog.yaml#/materializations/good, control_id: "0000000000000000", data_plane_id: "1d1d1d1d1d1d1d1d", expect_pub_id: "0000000000000000", expect_build_id: "0000000000000000", model: { "endpoint": { - "dekaf": {} - }, - "bindings": [ - { - "resource": {}, - "source": "testing/schema_with_properties", - "fields": { - "recommended": true - } + "dekaf": { + "variant": "foo", + "config": {"strict_topic_names":false} } - ] + }, + "bindings": [] }, validated: Validated { - bindings: [ - Binding { - constraints: { - "_meta/flow_truncated": Constraint { - r#type: FieldOptional, - reason: "no-op validator allows everything", - }, - "flow_document": Constraint { - r#type: FieldOptional, - reason: "no-op validator allows everything", - }, - "flow_published_at": Constraint { - r#type: FieldOptional, - reason: "no-op validator allows everything", - }, - "id": Constraint { - r#type: FieldOptional, - reason: "no-op validator allows everything", - }, - }, - resource_path: [ - "binding-0", - ], - delta_updates: true, - }, - ], + bindings: [], }, spec: MaterializationSpec { - name: "testing/test_dekaf", + name: "good", connector_type: Dekaf, - config_json: "{}", - bindings: [ - Binding { - resource_config_json: "{}", - resource_path: [ - "binding-0", - ], - collection: Some( - CollectionSpec { - name: "testing/schema_with_properties", - write_schema_json: "{\"$id\":\"test://example/catalog.yaml?ptr=/collections/testing~1schema_with_properties/schema\",\"properties\":{\"id\":{\"type\":\"string\"}},\"required\":[\"id\"],\"type\":\"object\"}", - read_schema_json: "", - key: [ - "/id", - ], - uuid_ptr: "/_meta/uuid", - partition_fields: [], - projections: [ - Projection { - ptr: "/_meta/flow_truncated", - field: "_meta/flow_truncated", - explicit: false, - is_partition_key: false, - is_primary_key: false, - inference: Some( - Inference { - types: [ - "boolean", - ], - string: None, - title: "Flow truncation indicator", - description: "Indicates whether any of the materialized values for this row have been truncated to make them fit inside the limitations of the destination system.", - default_json: "", - secret: false, - exists: Must, - numeric: None, - }, - ), - }, - Projection { - ptr: "", - field: "flow_document", - explicit: false, - is_partition_key: false, - is_primary_key: false, - inference: Some( - Inference { - types: [ - "object", - ], - string: None, - title: "", - description: "", - default_json: "", - secret: false, - exists: Must, - numeric: None, - }, - ), - }, - Projection { - ptr: "/_meta/uuid", - field: "flow_published_at", - explicit: false, - is_partition_key: false, - is_primary_key: false, - inference: Some( - Inference { - types: [ - "string", - ], - string: Some( - String { - content_type: "", - format: "date-time", - content_encoding: "uuid", - max_length: 0, - }, - ), - title: "Flow Publication Time", - description: "Flow publication date-time of this document", - default_json: "", - secret: false, - exists: Must, - numeric: None, - }, - ), - }, - Projection { - ptr: "/id", - field: "id", - explicit: false, - is_partition_key: false, - is_primary_key: true, - inference: Some( - Inference { - types: [ - "string", - ], - string: Some( - String { - content_type: "", - format: "", - content_encoding: "", - max_length: 0, - }, - ), - title: "", - description: "", - default_json: "", - secret: false, - exists: Must, - numeric: None, - }, - ), - }, - ], - ack_template_json: "{\"_meta\":{\"ack\":true,\"uuid\":\"DocUUIDPlaceholder-329Bb50aa48EAa9ef\"}}", - partition_template: Some( - JournalSpec { - name: "testing/schema_with_properties/2020202020202020", - replication: 3, - labels: Some( - LabelSet { - labels: [ - Label { - name: "app.gazette.dev/managed-by", - value: "estuary.dev/flow", - prefix: false, - }, - Label { - name: "content-type", - value: "application/x-ndjson", - prefix: false, - }, - Label { - name: "estuary.dev/build", - value: "2121212121212121", - prefix: false, - }, - Label { - name: "estuary.dev/collection", - value: "testing/schema_with_properties", - prefix: false, - }, - ], - }, - ), - fragment: Some( - Fragment { - length: 536870912, - compression_codec: Gzip, - stores: [ - "s3://a-bucket/", - ], - refresh_interval: Some( - Duration { - seconds: 300, - nanos: 0, - }, - ), - retention: None, - flush_interval: None, - path_postfix_template: "utc_date={{.Spool.FirstAppendTime.Format \"2006-01-02\"}}/utc_hour={{.Spool.FirstAppendTime.Format \"15\"}}", - }, - ), - flags: 4, - max_append_rate: 4194304, - }, - ), - derivation: None, - }, - ), - partition_selector: Some( - LabelSelector { - include: Some( - LabelSet { - labels: [ - Label { - name: "estuary.dev/collection", - value: "testing/schema_with_properties", - prefix: false, - }, - ], - }, - ), - exclude: Some( - LabelSet { - labels: [], - }, - ), - }, - ), - priority: 0, - field_selection: Some( - FieldSelection { - keys: [], - values: [], - document: "", - field_config_json_map: {}, - }, - ), - delta_updates: true, - deprecated_shuffle: None, - journal_read_suffix: "materialize/testing/test_dekaf/binding-0", - not_before: None, - not_after: None, - backfill: 0, - state_key: "binding-0", - }, - ], + config_json: "{\"variant\":\"foo\",\"config\":{\"strict_topic_names\":false}}", + bindings: [], shard_template: Some( ShardSpec { - id: "materialize/testing/test_dekaf/2020202020202020", + id: "materialize/good/2020202020202020", sources: [], recovery_log_prefix: "recovery", hint_prefix: "/estuary/flow/hints", @@ -494,7 +72,7 @@ Outcome { }, Label { name: "estuary.dev/task-name", - value: "testing/test_dekaf", + value: "good", prefix: false, }, Label { @@ -512,7 +90,7 @@ Outcome { ), recovery_log_template: Some( JournalSpec { - name: "recovery/materialize/testing/test_dekaf/2020202020202020", + name: "recovery/materialize/good/2020202020202020", replication: 3, labels: Some( LabelSet { @@ -534,7 +112,7 @@ Outcome { }, Label { name: "estuary.dev/task-name", - value: "testing/test_dekaf", + value: "good", prefix: false, }, Label { @@ -571,25 +149,12 @@ Outcome { }, previous_spec: NULL, is_touch: 0, - dependency_hash: 32182596aeb1e4a0, + dependency_hash: NULL, }, ], built_tests: [], captures: [], - collections: [ - DraftCollection { - collection: testing/schema_with_properties, - scope: test://example/catalog.yaml#/collections/testing~1schema_with_properties, - expect_pub_id: NULL, - model: { - "schema": {"$id":"test://example/catalog.yaml?ptr=/collections/testing~1schema_with_properties/schema","properties":{"id":{"type":"string"}},"required":["id"],"type":"object"}, - "key": [ - "/id" - ] - }, - is_touch: 0, - }, - ], + collections: [], errors: [], errors_draft: [], fetches: [ @@ -601,22 +166,17 @@ Outcome { imports: [], materializations: [ DraftMaterialization { - materialization: testing/test_dekaf, - scope: test://example/catalog.yaml#/materializations/testing~1test_dekaf, + materialization: good, + scope: test://example/catalog.yaml#/materializations/good, expect_pub_id: NULL, model: { "endpoint": { - "dekaf": {} - }, - "bindings": [ - { - "resource": {}, - "source": "testing/schema_with_properties", - "fields": { - "recommended": true - } + "dekaf": { + "variant": "foo", + "config": {"strict_topic_names":false} } - ] + }, + "bindings": [] }, is_touch: 0, }, @@ -626,7 +186,7 @@ Outcome { resource: test://example/catalog.yaml, content_type: "CATALOG", content: ".. binary ..", - content_dom: {"collections":{"testing/schema_with_properties":{"key":["/id"],"schema":{"properties":{"id":{"type":"string"}},"required":["id"],"type":"object"}}},"materializations":{"testing/test_dekaf":{"bindings":[{"resource":{},"source":"testing/schema_with_properties"}],"endpoint":{"dekaf":{}}}}}, + content_dom: {"materializations":{"good":{"bindings":[],"endpoint":{"dekaf":{"config":{"strict_topic_names":false},"variant":"foo"}}}}}, }, ], storage_mappings: [ diff --git a/crates/validation/tests/snapshots/scenario_tests__dekaf_materialization_invalid.snap b/crates/validation/tests/snapshots/scenario_tests__dekaf_materialization_invalid.snap index 697478a621..34fc8a0dec 100644 --- a/crates/validation/tests/snapshots/scenario_tests__dekaf_materialization_invalid.snap +++ b/crates/validation/tests/snapshots/scenario_tests__dekaf_materialization_invalid.snap @@ -1,6 +1,6 @@ --- source: crates/validation/tests/scenario_tests.rs -assertion_line: 100 +assertion_line: 98 expression: outcome --- Outcome { @@ -14,7 +14,7 @@ Outcome { errors_draft: [ Error { scope: test://example/catalog.yaml, - error: failed to parse document (data did not match any variant of untagged enum DekafConfigContainer at line 1 column 288): data did not match any variant of untagged enum DekafConfigContainer at line 1 column 288, + error: failed to parse document (invalid type: boolean `false`, expected struct DekafConfig at line 1 column 215): invalid type: boolean `false`, expected struct DekafConfig at line 1 column 215, }, ], fetches: [ @@ -30,7 +30,7 @@ Outcome { resource: test://example/catalog.yaml, content_type: "CATALOG", content: ".. binary ..", - content_dom: {"collections":{"testing/schema_with_properties":{"key":["/id"],"schema":{"properties":{"id":{"type":"string"}},"required":["id"],"type":"object"}}},"materializations":{"testing/test_dekaf":{"bindings":[{"resource":{},"source":"testing/schema_with_properties"}],"endpoint":{"dekaf":false}}}}, + content_dom: {"collections":{"testing/schema_with_properties":{"key":["/id"],"schema":{"properties":{"id":{"type":"string"}},"required":["id"],"type":"object"}}},"materializations":{"bad":{"bindings":[],"endpoint":{"dekaf":false}}}}, }, ], storage_mappings: [ diff --git a/crates/validation/tests/snapshots/scenario_tests__dekaf_materialization_nonexistent.snap b/crates/validation/tests/snapshots/scenario_tests__dekaf_materialization_nonexistent.snap index 7240236b95..744e672bc5 100644 --- a/crates/validation/tests/snapshots/scenario_tests__dekaf_materialization_nonexistent.snap +++ b/crates/validation/tests/snapshots/scenario_tests__dekaf_materialization_nonexistent.snap @@ -1,599 +1,24 @@ --- source: crates/validation/tests/scenario_tests.rs -assertion_line: 131 +assertion_line: 125 expression: outcome --- Outcome { built_captures: [], - built_collections: [ - BuiltCollection { - collection: testing/schema_with_properties, - scope: test://example/catalog.yaml#/collections/testing~1schema_with_properties, - control_id: "0000000000000000", - data_plane_id: "1d1d1d1d1d1d1d1d", - expect_pub_id: "0000000000000000", - expect_build_id: "0000000000000000", - model: { - "schema": {"$id":"test://example/catalog.yaml?ptr=/collections/testing~1schema_with_properties/schema","properties":{"id":{"type":"string"}},"required":["id"],"type":"object"}, - "key": [ - "/id" - ] - }, - validated: NULL, - spec: CollectionSpec { - name: "testing/schema_with_properties", - write_schema_json: "{\"$id\":\"test://example/catalog.yaml?ptr=/collections/testing~1schema_with_properties/schema\",\"properties\":{\"id\":{\"type\":\"string\"}},\"required\":[\"id\"],\"type\":\"object\"}", - read_schema_json: "", - key: [ - "/id", - ], - uuid_ptr: "/_meta/uuid", - partition_fields: [], - projections: [ - Projection { - ptr: "/_meta/flow_truncated", - field: "_meta/flow_truncated", - explicit: false, - is_partition_key: false, - is_primary_key: false, - inference: Some( - Inference { - types: [ - "boolean", - ], - string: None, - title: "Flow truncation indicator", - description: "Indicates whether any of the materialized values for this row have been truncated to make them fit inside the limitations of the destination system.", - default_json: "", - secret: false, - exists: Must, - numeric: None, - }, - ), - }, - Projection { - ptr: "", - field: "flow_document", - explicit: false, - is_partition_key: false, - is_primary_key: false, - inference: Some( - Inference { - types: [ - "object", - ], - string: None, - title: "", - description: "", - default_json: "", - secret: false, - exists: Must, - numeric: None, - }, - ), - }, - Projection { - ptr: "/_meta/uuid", - field: "flow_published_at", - explicit: false, - is_partition_key: false, - is_primary_key: false, - inference: Some( - Inference { - types: [ - "string", - ], - string: Some( - String { - content_type: "", - format: "date-time", - content_encoding: "uuid", - max_length: 0, - }, - ), - title: "Flow Publication Time", - description: "Flow publication date-time of this document", - default_json: "", - secret: false, - exists: Must, - numeric: None, - }, - ), - }, - Projection { - ptr: "/id", - field: "id", - explicit: false, - is_partition_key: false, - is_primary_key: true, - inference: Some( - Inference { - types: [ - "string", - ], - string: Some( - String { - content_type: "", - format: "", - content_encoding: "", - max_length: 0, - }, - ), - title: "", - description: "", - default_json: "", - secret: false, - exists: Must, - numeric: None, - }, - ), - }, - ], - ack_template_json: "{\"_meta\":{\"ack\":true,\"uuid\":\"DocUUIDPlaceholder-329Bb50aa48EAa9ef\"}}", - partition_template: Some( - JournalSpec { - name: "testing/schema_with_properties/2020202020202020", - replication: 3, - labels: Some( - LabelSet { - labels: [ - Label { - name: "app.gazette.dev/managed-by", - value: "estuary.dev/flow", - prefix: false, - }, - Label { - name: "content-type", - value: "application/x-ndjson", - prefix: false, - }, - Label { - name: "estuary.dev/build", - value: "2121212121212121", - prefix: false, - }, - Label { - name: "estuary.dev/collection", - value: "testing/schema_with_properties", - prefix: false, - }, - ], - }, - ), - fragment: Some( - Fragment { - length: 536870912, - compression_codec: Gzip, - stores: [ - "s3://a-bucket/", - ], - refresh_interval: Some( - Duration { - seconds: 300, - nanos: 0, - }, - ), - retention: None, - flush_interval: None, - path_postfix_template: "utc_date={{.Spool.FirstAppendTime.Format \"2006-01-02\"}}/utc_hour={{.Spool.FirstAppendTime.Format \"15\"}}", - }, - ), - flags: 4, - max_append_rate: 4194304, - }, - ), - derivation: None, - }, - previous_spec: NULL, - is_touch: 0, - dependency_hash: NULL, - }, - ], - built_materializations: [ - BuiltMaterialization { - materialization: testing/test_dekaf, - scope: test://example/catalog.yaml#/materializations/testing~1test_dekaf, - control_id: "0000000000000000", - data_plane_id: "1d1d1d1d1d1d1d1d", - expect_pub_id: "0000000000000000", - expect_build_id: "0000000000000000", - model: { - "endpoint": { - "dekaf": "foo/bar" - }, - "bindings": [ - { - "resource": {}, - "source": "testing/schema_with_properties", - "fields": { - "recommended": true - } - } - ] - }, - validated: Validated { - bindings: [ - Binding { - constraints: { - "_meta/flow_truncated": Constraint { - r#type: FieldOptional, - reason: "no-op validator allows everything", - }, - "flow_document": Constraint { - r#type: FieldOptional, - reason: "no-op validator allows everything", - }, - "flow_published_at": Constraint { - r#type: FieldOptional, - reason: "no-op validator allows everything", - }, - "id": Constraint { - r#type: FieldOptional, - reason: "no-op validator allows everything", - }, - }, - resource_path: [ - "binding-0", - ], - delta_updates: true, - }, - ], - }, - spec: MaterializationSpec { - name: "testing/test_dekaf", - connector_type: Dekaf, - config_json: "\"foo/bar\"", - bindings: [ - Binding { - resource_config_json: "{}", - resource_path: [ - "binding-0", - ], - collection: Some( - CollectionSpec { - name: "testing/schema_with_properties", - write_schema_json: "{\"$id\":\"test://example/catalog.yaml?ptr=/collections/testing~1schema_with_properties/schema\",\"properties\":{\"id\":{\"type\":\"string\"}},\"required\":[\"id\"],\"type\":\"object\"}", - read_schema_json: "", - key: [ - "/id", - ], - uuid_ptr: "/_meta/uuid", - partition_fields: [], - projections: [ - Projection { - ptr: "/_meta/flow_truncated", - field: "_meta/flow_truncated", - explicit: false, - is_partition_key: false, - is_primary_key: false, - inference: Some( - Inference { - types: [ - "boolean", - ], - string: None, - title: "Flow truncation indicator", - description: "Indicates whether any of the materialized values for this row have been truncated to make them fit inside the limitations of the destination system.", - default_json: "", - secret: false, - exists: Must, - numeric: None, - }, - ), - }, - Projection { - ptr: "", - field: "flow_document", - explicit: false, - is_partition_key: false, - is_primary_key: false, - inference: Some( - Inference { - types: [ - "object", - ], - string: None, - title: "", - description: "", - default_json: "", - secret: false, - exists: Must, - numeric: None, - }, - ), - }, - Projection { - ptr: "/_meta/uuid", - field: "flow_published_at", - explicit: false, - is_partition_key: false, - is_primary_key: false, - inference: Some( - Inference { - types: [ - "string", - ], - string: Some( - String { - content_type: "", - format: "date-time", - content_encoding: "uuid", - max_length: 0, - }, - ), - title: "Flow Publication Time", - description: "Flow publication date-time of this document", - default_json: "", - secret: false, - exists: Must, - numeric: None, - }, - ), - }, - Projection { - ptr: "/id", - field: "id", - explicit: false, - is_partition_key: false, - is_primary_key: true, - inference: Some( - Inference { - types: [ - "string", - ], - string: Some( - String { - content_type: "", - format: "", - content_encoding: "", - max_length: 0, - }, - ), - title: "", - description: "", - default_json: "", - secret: false, - exists: Must, - numeric: None, - }, - ), - }, - ], - ack_template_json: "{\"_meta\":{\"ack\":true,\"uuid\":\"DocUUIDPlaceholder-329Bb50aa48EAa9ef\"}}", - partition_template: Some( - JournalSpec { - name: "testing/schema_with_properties/2020202020202020", - replication: 3, - labels: Some( - LabelSet { - labels: [ - Label { - name: "app.gazette.dev/managed-by", - value: "estuary.dev/flow", - prefix: false, - }, - Label { - name: "content-type", - value: "application/x-ndjson", - prefix: false, - }, - Label { - name: "estuary.dev/build", - value: "2121212121212121", - prefix: false, - }, - Label { - name: "estuary.dev/collection", - value: "testing/schema_with_properties", - prefix: false, - }, - ], - }, - ), - fragment: Some( - Fragment { - length: 536870912, - compression_codec: Gzip, - stores: [ - "s3://a-bucket/", - ], - refresh_interval: Some( - Duration { - seconds: 300, - nanos: 0, - }, - ), - retention: None, - flush_interval: None, - path_postfix_template: "utc_date={{.Spool.FirstAppendTime.Format \"2006-01-02\"}}/utc_hour={{.Spool.FirstAppendTime.Format \"15\"}}", - }, - ), - flags: 4, - max_append_rate: 4194304, - }, - ), - derivation: None, - }, - ), - partition_selector: Some( - LabelSelector { - include: Some( - LabelSet { - labels: [ - Label { - name: "estuary.dev/collection", - value: "testing/schema_with_properties", - prefix: false, - }, - ], - }, - ), - exclude: Some( - LabelSet { - labels: [], - }, - ), - }, - ), - priority: 0, - field_selection: Some( - FieldSelection { - keys: [], - values: [], - document: "", - field_config_json_map: {}, - }, - ), - delta_updates: true, - deprecated_shuffle: None, - journal_read_suffix: "materialize/testing/test_dekaf/binding-0", - not_before: None, - not_after: None, - backfill: 0, - state_key: "binding-0", - }, - ], - shard_template: Some( - ShardSpec { - id: "materialize/testing/test_dekaf/2020202020202020", - sources: [], - recovery_log_prefix: "recovery", - hint_prefix: "/estuary/flow/hints", - hint_backups: 2, - max_txn_duration: Some( - Duration { - seconds: 300, - nanos: 0, - }, - ), - min_txn_duration: Some( - Duration { - seconds: 0, - nanos: 0, - }, - ), - disable: false, - hot_standbys: 0, - labels: Some( - LabelSet { - labels: [ - Label { - name: "app.gazette.dev/managed-by", - value: "estuary.dev/flow", - prefix: false, - }, - Label { - name: "estuary.dev/build", - value: "2121212121212121", - prefix: false, - }, - Label { - name: "estuary.dev/log-level", - value: "info", - prefix: false, - }, - Label { - name: "estuary.dev/task-name", - value: "testing/test_dekaf", - prefix: false, - }, - Label { - name: "estuary.dev/task-type", - value: "materialization", - prefix: false, - }, - ], - }, - ), - disable_wait_for_ack: false, - ring_buffer_size: 65536, - read_channel_size: 4096, - }, - ), - recovery_log_template: Some( - JournalSpec { - name: "recovery/materialize/testing/test_dekaf/2020202020202020", - replication: 3, - labels: Some( - LabelSet { - labels: [ - Label { - name: "app.gazette.dev/managed-by", - value: "estuary.dev/flow", - prefix: false, - }, - Label { - name: "content-type", - value: "application/x-gazette-recoverylog", - prefix: false, - }, - Label { - name: "estuary.dev/build", - value: "2121212121212121", - prefix: false, - }, - Label { - name: "estuary.dev/task-name", - value: "testing/test_dekaf", - prefix: false, - }, - Label { - name: "estuary.dev/task-type", - value: "materialization", - prefix: false, - }, - ], - }, - ), - fragment: Some( - Fragment { - length: 268435456, - compression_codec: Snappy, - stores: [ - "s3://a-bucket/", - ], - refresh_interval: Some( - Duration { - seconds: 300, - nanos: 0, - }, - ), - retention: None, - flush_interval: None, - path_postfix_template: "", - }, - ), - flags: 4, - max_append_rate: 4194304, - }, - ), - network_ports: [], - }, - previous_spec: NULL, - is_touch: 0, - dependency_hash: 32182596aeb1e4a0, - }, - ], + built_collections: [], + built_materializations: [], built_tests: [], captures: [], - collections: [ - DraftCollection { - collection: testing/schema_with_properties, - scope: test://example/catalog.yaml#/collections/testing~1schema_with_properties, - expect_pub_id: NULL, - model: { - "schema": {"$id":"test://example/catalog.yaml?ptr=/collections/testing~1schema_with_properties/schema","properties":{"id":{"type":"string"}},"required":["id"],"type":"object"}, - "key": [ - "/id" - ] - }, - is_touch: 0, + collections: [], + errors: [ + Error { + scope: test://example/catalog.yaml#/materializations/bad, + error: driver fixture not found: bad, }, ], - errors: [], errors_draft: [ Error { - scope: test://example/catalog.yaml#/materializations/testing~1test_dekaf/endpoint/dekaf/config, + scope: test://example/catalog.yaml#/materializations/bad/endpoint/dekaf/config, error: failed to fetch resource test://example/foo/bar: fixture not found, }, ], @@ -609,28 +34,23 @@ Outcome { ], imports: [ Import { - scope: test://example/catalog.yaml#/materializations/testing~1test_dekaf/endpoint/dekaf/config, + scope: test://example/catalog.yaml#/materializations/bad/endpoint/dekaf/config, to_resource: test://example/foo/bar, }, ], materializations: [ DraftMaterialization { - materialization: testing/test_dekaf, - scope: test://example/catalog.yaml#/materializations/testing~1test_dekaf, + materialization: bad, + scope: test://example/catalog.yaml#/materializations/bad, expect_pub_id: NULL, model: { "endpoint": { - "dekaf": "foo/bar" - }, - "bindings": [ - { - "resource": {}, - "source": "testing/schema_with_properties", - "fields": { - "recommended": true - } + "dekaf": { + "variant": "bar", + "config": "foo/bar" } - ] + }, + "bindings": [] }, is_touch: 0, }, @@ -640,7 +60,7 @@ Outcome { resource: test://example/catalog.yaml, content_type: "CATALOG", content: ".. binary ..", - content_dom: {"collections":{"testing/schema_with_properties":{"key":["/id"],"schema":{"properties":{"id":{"type":"string"}},"required":["id"],"type":"object"}}},"materializations":{"testing/test_dekaf":{"bindings":[{"resource":{},"source":"testing/schema_with_properties"}],"endpoint":{"dekaf":"foo/bar"}}}}, + content_dom: {"materializations":{"bad":{"bindings":[],"endpoint":{"dekaf":{"config":"foo/bar","variant":"bar"}}}}}, }, ], storage_mappings: [