Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[source-zoho-crm] No way to ignore missing 'Modified_Time' from source schema #53218

Open
1 task
MickPerring opened this issue Feb 7, 2025 · 3 comments
Open
1 task
Labels
area/connectors Connector related issues autoteam community connectors/source/zoho-crm team/community-devs Community Engineers working on low-code sources team/use type/bug Something isn't working

Comments

@MickPerring
Copy link

Connector Name

source-zohocrm

Connector Version

0.1.3

What step the error happened?

During the sync

Relevant information

Issue

I have set up a connection from the ZohoCRM to a Postgres database. My syncs are failing for two of the streams due to not finding the 'Modified_Time' key, which is automatically set as the cursor.

Possible Cause

It seems like the ZohoCRM API is returning 'Modified_Time' from the settings endpoint for the two failing modules, so the schema expects it to be there, but then doesn't seem to return the field when the records are retrieved from the API. Setting the sync mode of the streams to 'Full refresh | Overwrite' and manually deselecting the 'Modified_Time' fields in theory should work as it would not attempt to sync those fields, but it has the same failure, as the connector code explicitly sets the cursor_field to "Modified_Time" and then breaks in the read_records method when it can't find the key in the records (this happens in stream.py).

Possible Solution

Perhaps there is a way to not explicitly rely on the 'Modified_Time' field being present in the records if it is deselected from the schema, or maybe it can just be set to a default value if the key is not found?

zohocrm___postgres_logs_27995180_txt.txt

Relevant log output

2025-02-07 09:30:27 destination ERROR main i.a.c.i.u.ConnectorExceptionHandler(handleException):68 caught exception! io.airbyte.commons.exceptions.TransientErrorException: Some streams were unsuccessful due to a source error. See logs for details.
	at io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer.close(AsyncStreamConsumer.kt:217) ~[airbyte-cdk-core-0.48.5.jar:?]
	at io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer$Companion$appendOnClose$1.close(SerializedAirbyteMessageConsumer.kt:70) ~[airbyte-cdk-core-0.48.5.jar:?]
	at kotlin.jdk7.AutoCloseableKt.closeFinally(AutoCloseableJVM.kt:48) ~[kotlin-stdlib-2.0.0.jar:2.0.0-release-341]
	at io.airbyte.cdk.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.kt:215) [airbyte-cdk-core-0.48.5.jar:?]
	at io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.kt:119) [airbyte-cdk-core-0.48.5.jar:?]
	at io.airbyte.cdk.integrations.base.IntegrationRunner.run$default(IntegrationRunner.kt:113) [airbyte-cdk-core-0.48.5.jar:?]
	at io.airbyte.integrations.destination.postgres.PostgresDestinationStrictEncrypt$Companion.main(PostgresDestinationStrictEncrypt.kt:74) [io.airbyte.airbyte-integrations.connectors-destination-postgres-strict-encrypt.jar:?]
	at io.airbyte.integrations.destination.postgres.PostgresDestinationStrictEncrypt.main(PostgresDestinationStrictEncrypt.kt) [io.airbyte.airbyte-integrations.connectors-destination-postgres-strict-encrypt.jar:?]

Stack Trace: io.airbyte.commons.exceptions.TransientErrorException: Some streams were unsuccessful due to a source error. See logs for details.
	at io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer.close(AsyncStreamConsumer.kt:217)
	at io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer$Companion$appendOnClose$1.close(SerializedAirbyteMessageConsumer.kt:70)
	at kotlin.jdk7.AutoCloseableKt.closeFinally(AutoCloseableJVM.kt:48)
	at io.airbyte.cdk.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.kt:215)
	at io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.kt:119)
	at io.airbyte.cdk.integrations.base.IntegrationRunner.run$default(IntegrationRunner.kt:113)
	at io.airbyte.integrations.destination.postgres.PostgresDestinationStrictEncrypt$Companion.main(PostgresDestinationStrictEncrypt.kt:74)
	at io.airbyte.integrations.destination.postgres.PostgresDestinationStrictEncrypt.main(PostgresDestinationStrictEncrypt.kt)
...
2025-02-07 09:30:28 replication-orchestrator INFO failures: [ {
  "failureOrigin" : "source",
  "failureType" : "system_error",
  "internalMessage" : "'Modified_Time'",
  "externalMessage" : "Something went wrong in the connector. See the logs for more details.",
  "metadata" : {
    "attemptNumber" : 0,
    "jobId" : 27995180,
    "from_trace_message" : true,
    "connector_command" : "read"
  },
  "stacktrace" : "Traceback (most recent call last):\n  File \"/usr/local/lib/python3.10/site-packages/airbyte_cdk/sources/abstract_source.py\", line 129, in read\n    stream_is_available, reason = stream_instance.check_availability(logger, self)\n  File \"/usr/local/lib/python3.10/site-packages/airbyte_cdk/sources/streams/core.py\", line 329, in check_availability\n    return self.availability_strategy.check_availability(self, logger, source)\n  File \"/usr/local/lib/python3.10/site-packages/airbyte_cdk/sources/streams/http/availability_strategy.py\", line 50, in check_availability\n    get_first_record_for_slice(stream, stream_slice)\n  File \"/usr/local/lib/python3.10/site-packages/airbyte_cdk/sources/streams/utils/stream_helper.py\", line 40, in get_first_record_for_slice\n    return next(records_for_slice)\n  File \"/airbyte/integration_code/source_zoho_crm/streams.py\", line 92, in read_records\n    latest_cursor_value = datetime.datetime.fromisoformat(record[self.cursor_field])\nKeyError: 'Modified_Time'\n",
  "timestamp" : 1738913398168,
  "streamDescriptor" : {
    "name" : "incremental_deals_zoho_crm_stream"
  }
}, {
  "failureOrigin" : "source",
  "failureType" : "config_error",
  "externalMessage" : "During the sync, the following streams did not sync successfully: incremental_deals_zoho_crm_stream: AirbyteTracedException(\"'Modified_Time'\")",
  "metadata" : {
    "attemptNumber" : 0,
    "jobId" : 27995180,
    "from_trace_message" : true,
    "connector_command" : "read"
  },
  "stacktrace" : "Traceback (most recent call last):\n  File \"/airbyte/integration_code/main.py\", line 9, in <module>\n    run()\n  File \"/airbyte/integration_code/source_zoho_crm/run.py\", line 15, in run\n    launch(source, sys.argv[1:])\n  File \"/usr/local/lib/python3.10/site-packages/airbyte_cdk/entrypoint.py\", line 235, in launch\n    for message in source_entrypoint.run(parsed_args):\n  File \"/usr/local/lib/python3.10/site-packages/airbyte_cdk/entrypoint.py\", line 122, in run\n    yield from map(AirbyteEntrypoint.airbyte_message_to_string, self.read(source_spec, config, config_catalog, state))\n  File \"/usr/local/lib/python3.10/site-packages/airbyte_cdk/entrypoint.py\", line 166, in read\n    for message in self.source.read(self.logger, config, catalog, state):\n  File \"/usr/local/lib/python3.10/site-packages/airbyte_cdk/sources/abstract_source.py\", line 185, in read\n    raise AirbyteTracedException(message=error_message, failure_type=FailureType.config_error)\nairbyte_cdk.utils.traced_exception.AirbyteTracedException: None\n",
  "timestamp" : 1738913400144
}, {
  "failureOrigin" : "destination",
  "failureType" : "transient_error",
  "internalMessage" : "Some streams either received an INCOMPLETE stream status, or did not receive a stream status at all: zoho-crm.incremental_notes_zoho_crm_stream, zoho-crm.incremental_deals_zoho_crm_stream, zoho-crm.incremental_cases_zoho_crm_stream, zoho-crm.incremental_attachments_zoho_crm_stream, zoho-crm.incremental_tasks_zoho_crm_stream, zoho-crm.incremental_activities_zoho_crm_stream\nio.airbyte.commons.exceptions.TransientErrorException: Some streams were unsuccessful due to a source error. See logs for details.",
  "externalMessage" : "Some streams were unsuccessful due to a source error. See logs for details.",
  "metadata" : {
    "attemptNumber" : 0,
    "jobId" : 27995180,
    "from_trace_message" : true,
    "connector_command" : "write"
  },
  "stacktrace" : "io.airbyte.commons.exceptions.TransientErrorException: Some streams were unsuccessful due to a source error. See logs for details.\n\tat io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer.close(AsyncStreamConsumer.kt:217)\n\tat io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer$Companion$appendOnClose$1.close(SerializedAirbyteMessageConsumer.kt:70)\n\tat kotlin.jdk7.AutoCloseableKt.closeFinally(AutoCloseableJVM.kt:48)\n\tat io.airbyte.cdk.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.kt:215)\n\tat io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.kt:119)\n\tat io.airbyte.cdk.integrations.base.IntegrationRunner.run$default(IntegrationRunner.kt:113)\n\tat io.airbyte.integrations.destination.postgres.PostgresDestinationStrictEncrypt$Companion.main(PostgresDestinationStrictEncrypt.kt:74)\n\tat io.airbyte.integrations.destination.postgres.PostgresDestinationStrictEncrypt.main(PostgresDestinationStrictEncrypt.kt)\n",
  "timestamp" : 1738913427021
}, {
  "failureOrigin" : "source",
  "internalMessage" : "Source process exited with non-zero exit code 1",
  "externalMessage" : "Something went wrong within the source connector",
  "metadata" : {
    "attemptNumber" : 0,
    "jobId" : 27995180,
    "connector_command" : "read"
  },
  "stacktrace" : "io.airbyte.workers.internal.exception.SourceException: Source process exited with non-zero exit code 1\n\tat io.airbyte.workers.general.BufferedReplicationWorker.readFromSource(BufferedReplicationWorker.java:364)\n\tat io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsyncWithHeartbeatCheck$3(BufferedReplicationWorker.java:222)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\n\tat java.base/java.lang.Thread.run(Thread.java:1583)\n",
  "timestamp" : 1738913400265
}, {
  "failureOrigin" : "destination",
  "internalMessage" : "Destination process exited with non-zero exit code 1",
  "externalMessage" : "Something went wrong within the destination connector",
  "metadata" : {
    "attemptNumber" : 0,
    "jobId" : 27995180,
    "connector_command" : "write"
  },
  "stacktrace" : "io.airbyte.workers.internal.exception.DestinationException: Destination process exited with non-zero exit code 1\n\tat io.airbyte.workers.general.BufferedReplicationWorker.readFromDestination(BufferedReplicationWorker.java:502)\n\tat io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsync$2(BufferedReplicationWorker.java:215)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\n\tat java.base/java.lang.Thread.run(Thread.java:1583)\n",
  "timestamp" : 1738913427068
}, {
  "failureOrigin" : "replication",
  "internalMessage" : "io.airbyte.workers.exception.WorkerException: Destination process exit with code 1. This warning is normal if the job was cancelled.",
  "externalMessage" : "Something went wrong during replication",
  "metadata" : {
    "attemptNumber" : 0,
    "jobId" : 27995180
  },
  "stacktrace" : "java.lang.RuntimeException: io.airbyte.workers.exception.WorkerException: Destination process exit with code 1. This warning is normal if the job was cancelled.\n\tat io.airbyte.workers.general.BufferedReplicationWorker$CloseableWithTimeout.lambda$close$0(BufferedReplicationWorker.java:547)\n\tat io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsyncWithTimeout$5(BufferedReplicationWorker.java:243)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\n\tat java.base/java.lang.Thread.run(Thread.java:1583)\nCaused by: io.airbyte.workers.exception.WorkerException: Destination process exit with code 1. This warning is normal if the job was cancelled.\n\tat io.airbyte.workers.internal.LocalContainerAirbyteDestination.close(LocalContainerAirbyteDestination.kt:61)\n\tat io.airbyte.workers.general.BufferedReplicationWorker$CloseableWithTimeout.lambda$close$0(BufferedReplicationWorker.java:545)\n\t... 5 more\n",
  "timestamp" : 1738913427082
} ]

Contribute

  • Yes, I want to contribute
@natikgadzhi
Copy link
Contributor

Thank you for filing this.

If a field is used for a cursor, and it's not present, the connector will indeed fail. Honestly, our Zoho CRM connector needs some love, we should invest and update it to the latest CDK etc etc.

I don't have an immediate fix for you in mind. If the Modified_Time is not present on some records, we can't really use it as cursor, and we need to replace it.

@MickPerring
Copy link
Author

I just want to post a note here in case anyone else stumbles across this facing a similar issue.

The reason I ran into this issue in the first place was due to the fact that the particular module I was trying to sync from the ZohoCRM did not have a Modified_Time field attached to the records, but I was unable to manually add the field as it gave me an error stating that the field already existed (as it is a "system field"). At some point I was somehow able to add a neew Modified_Time field to the record, but then the key was set to Modified_Time1 in the API response and so the syncing issue persisted.

After some investigating and a discussion with the Zoho support team, I was able to resolve the issue by adding the Modified_By field back into the module record. As it turns out, the Modified_By and Modified_Time fields are linked on the CRM, and are added and removed together by Modified_By. Once I added it to the record, both fields were returned by the API response and the connector synced successfully.

I know this doesn't resolve the actual issue with the connector breaking when the Modified_Time field is missing, but it is at least a workaround for anyone that runs into this issue.

@natikgadzhi
Copy link
Contributor

I wonder if our connector should conditionally infer Modified_Time if it's not present, but the linked fields are there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues autoteam community connectors/source/zoho-crm team/community-devs Community Engineers working on low-code sources team/use type/bug Something isn't working
Projects
Status: 📥 Triaging
Development

No branches or pull requests

4 participants