Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingester PSC stream misses events #270

Open
tiredpixel opened this issue Jun 27, 2024 · 7 comments
Open

Ingester PSC stream misses events #270

tiredpixel opened this issue Jun 27, 2024 · 7 comments
Assignees

Comments

@tiredpixel
Copy link
Contributor

Ingester PSC stream misses events.

This has been discussed at length elsewhere, including in #264 . However, I'm creating this new ticket, to make it clearer what the resolutions were.

@tiredpixel
Copy link
Contributor Author

The issue has been discovered to be our side, not any issue in the Companies House PSC stream. It has been traced to faulty handling of the HTTP stream itself, prior to the payload being passed to the next stage, prior also to any attempt to parse the JSON, and certainly prior to any actual ingestion attempt or publishing to the Kinesis stream.

https://github.com/openownership/register-common/blob/c08dd811df9874e7d2509a423530e41c1aa7df4c/lib/register_common/adapters/http_adapter.rb

It seems that incorrect handling of newlines in the stream causes some of the stream to be silently discarded in some cases. This ultimately leads to events being dropped without errors, without logging, and not even appearing when debugging the payloads received from the stream itself.

The only way to observe this issue is to in fact debug the individual chunks coming through the stream, prior to any newline parsing. When such is done, the missing events can be observed, interleaved between the parsed events.

The fix will have to be to Register Common Adapters::HttpAdapter. Although the issue is only apparent in its use in Ingester PSC, that adapter is in fact used throughout other ingesters—seemingly without issue.

@tiredpixel
Copy link
Contributor Author

tiredpixel commented Jun 27, 2024

TODO

  • consider effect on Ingester PSC (other than known issue with streams)
  • consider effect on Ingester DK
  • consider effect on Ingester SK
  • consider effect on Sources BODS
  • fix Register Common Adapters::HttpAdapter
  • update Register Common in Ingester PSC
  • update Register Common in Ingester DK
  • update Register Common in Ingester SK
  • verify stream fix in Ingester PSC
  • deploy to Ingester PSC streaming service in Heroku
  • verify stream fix in Ingester PSC streaming service

@tiredpixel tiredpixel moved this from Todo to In Progress in Open Ownership Register and BODS pipelines Jun 27, 2024
@tiredpixel
Copy link
Contributor Author

Effect on Ingester PSC

HTTP adapter is used in lib/register_ingester_psc/config/adapters.rb.

This is in turn used in:

  1. lib/register_ingester_psc/snapshots/services/snapshot_link_scraper.rb
  2. lib/register_ingester_psc/streams/clients/psc_stream.rb
  3. lib/register_ingester_psc/snapshots/services/snapshot_downloader.rb

(1) uses RegisterCommon::Adapters::HttpAdapter get without a block. This is as defined here:
https://github.com/openownership/register-common/blob/c08dd811df9874e7d2509a423530e41c1aa7df4c/lib/register_common/adapters/http_adapter.rb#L45

That doesn't appear to have any issues. Rather, it seems like only the use of get with a block suffers from the fault.

(2) is the main issue discussed. It uses get with a block, and is faulty. This is as defined here:
https://github.com/openownership/register-common/blob/c08dd811df9874e7d2509a423530e41c1aa7df4c/lib/register_common/adapters/http_adapter.rb#L17

(3) uses get without a block. It doesn't appear to be faulty.

Effect on Ingester DK

HTTP adapter is used in register-ingester-dk/lib/register_ingester_dk/config/adapters.rb.

This doesn't actually appear to be used anywhere. It seems there is also no other direct use of HTTP libraries.

Effect on Ingester SK

HTTP adapter is used in register-ingester-sk/lib/register_ingester_sk/config/adapters.rb.

This also doesn't actually appear to be used anywhere, since Clients::SkClient uses Net::HTTP directly:
https://github.com/openownership/register-ingester-sk/blob/3230abe919a3e6bf3995f51e5321610982a99d7f/lib/register_ingester_sk/clients/sk_client.rb#L22

Effect on Sources BODS

HTTP adapter is used in register-sources-bods/lib/register_sources_bods/config/adapters.rb.

This doesn't actually appear to be used anywhere. It seems there is also no other direct use of HTTP libraries.

@tiredpixel
Copy link
Contributor Author

tiredpixel commented Jun 27, 2024

Reproduction of broken HTTP handling

To reproduce the broken HTTP handling and debug the stream data, it is possible to apply these patches:

Ingester PSC:

diff --git a/lib/register_ingester_psc/streams/clients/psc_stream.rb b/lib/register_ingester_psc/streams/clients/psc_stream.rb
index 1f28239..8fd2b0e 100644
--- a/lib/register_ingester_psc/streams/clients/psc_stream.rb
+++ b/lib/register_ingester_psc/streams/clients/psc_stream.rb
@@ -31,6 +31,7 @@ module RegisterIngesterPsc
               Authorization: "Basic #{basic_auth_key}"
             }
           ) do |content|
+            File.write('/tmp/psc-stream.log', content + "\n\n", mode: 'a+')
             timepoint_err = false
             parsed = JSON.parse(content, symbolize_names: true)
             match = %r{/company/(?<company_number>\w+)/}.match(parsed[:resource_uri])

Register Common:

diff --git a/lib/register_common/adapters/http_adapter.rb b/lib/register_common/adapters/http_adapter.rb
index 848bf96..89bb30d 100644
--- a/lib/register_common/adapters/http_adapter.rb
+++ b/lib/register_common/adapters/http_adapter.rb
@@ -22,6 +22,7 @@ module RegisterCommon
               URI(url), params, headers
             ) do |req|
               req.options.on_data = proc do |chunk, _overall_received_bytes|
+                File.write('/tmp/http-adapter.log', chunk + "\n", mode: 'a+')
                 current_chunk += chunk
                 lines = current_chunk.split("\n")
                 if current_chunk[-1] == "\n"

For ease of comparison, newlines are added between each stream event (as parsed, incorrectly), and each chunk (coming directly over HTTP).

Then, tailing each of these logs side-by-side shows two streams:

  1. Ingester PSC stream, as surfaced to the rest of the pipeline.
  2. Register Common stream, chunk-by-chunk, as received directly over HTTP.

Ingester PSC stream excerpt:

{"resource_kind":"company-psc-corporate","resource_uri":"/company/15619848/persons-with-significant-control/corporate-entity/SAc20LEjgpHIDrImcIh-bjX-HG8","resource_id":"SAc20LEjgpHIDrImcIh-bjX-HG8","data":{"address":{"country":"England","locality":"Northampton","postal_code":"NN3 2JS","premises":"96 Lindsay Avenue"},"etag":"075d905036586348db9ef70ba43077e91905c141","identification":{"country_registered":"England And Wales","legal_authority":"England","legal_form":"Limited Company","place_registered":"England And Wales","registration_number":"15720016"},"kind":"corporate-entity-person-with-significant-control","links":{"self":"/company/15619848/persons-with-significant-control/corporate-entity/SAc20LEjgpHIDrImcIh-bjX-HG8"},"name":"Tempeo Group Holdings Limited","natures_of_control":["ownership-of-shares-75-to-100-percent"],"notified_on":"2024-06-01"},"event":{"timepoint":11856669,"published_at":"2024-06-27T17:14:02","type":"changed"}}

{"resource_kind":"company-psc-corporate","resource_uri":"/company/10590193/persons-with-significant-control/corporate-entity/F7aBDTQZDEokt1pQ9H0Gy1fkjoU","resource_id":"F7aBDTQZDEokt1pQ9H0Gy1fkjoU","data":{"address":{"address_line_1":"29 Wellington Street","country":"United Kingdom","locality":"Leeds","postal_code":"LS1 4DL","premises":"Central Square"},"etag":"84303b37189f7c327e1eb6573a7ad45bd31b8542","identification":{"country_registered":"United Kingdom","legal_authority":"United Kingdom","legal_form":"Limited By Shares","place_registered":"Companies House","registration_number":"13355267"},"kind":"corporate-entity-person-with-significant-control","links":{"self":"/company/10590193/persons-with-significant-control/corporate-entity/F7aBDTQZDEokt1pQ9H0Gy1fkjoU"},"name":"Talgarth Uk Holdco Ltd","natures_of_control":["ownership-of-shares-75-to-100-percent","voting-rights-75-to-100-percent","right-to-appoint-and-remove-directors"],"notified_on":"2022-10-21"},"event":{"timepoint":11856671,"published_at":"2024-06-27T17:14:02","type":"changed"}}

{"resource_kind":"company-psc-corporate","resource_uri":"/company/14765796/persons-with-significant-control/corporate-entity/sqeY_OCZMzIzTA62cf50iUT7K7c","resource_id":"sqeY_OCZMzIzTA62cf50iUT7K7c","data":{"address":{"address_line_1":"1111b S Governors Ave Ste 3219","country":"United States","locality":"Dover","postal_code":"19904","premises":"1111b","region":"Delaware"},"ceased_on":"2024-03-19","etag":"c877215f898de83550fa588cc6f832f0f79a5173","identification":{"country_registered":"United States","legal_authority":"Delaware General Corporation Law","legal_form":"Delware C-Corporation","place_registered":"Division Of Corporations","registration_number":"3177301"},"kind":"corporate-entity-person-with-significant-control","links":{"self":"/company/14765796/persons-with-significant-control/corporate-entity/sqeY_OCZMzIzTA62cf50iUT7K7c"},"name":"Audiogen Inc","natures_of_control":["ownership-of-shares-75-to-100-percent","voting-rights-75-to-100-percent"],"notified_on":"2024-03-19"},"event":{"timepoint":11856673,"published_at":"2024-06-27T17:15:02.145435+01:00","type":"changed"}}

Timepoints: 11856669, 11856671, 11856673

Register Common stream chunk-by-chunk excerpt:

{"resource_kind":"company-psc-corporate","resource_uri":"/company/15619848/persons-with-significant-control/corporate-entity/SAc20LEjgpHIDrImcIh-bjX-HG8","resource_id":"SAc20LEjgpHIDrImcIh-bjX-HG8","data":{"address":{"country":"England","locality":"Northampton","postal_code":"NN3 2JS","premises":"96 Lindsay Avenue"},"etag":"075d905036586348db9ef70ba43077e91905c141","identification":{"country_registered":"England And Wales","legal_authority":"England","legal_form":"Limited Company","place_registered":"England And Wales","registration_number":"15720016"},"kind":"corporate-entity-person-with-significant-control","links":{"self":"/company/15619848/persons-with-significant-control/corporate-entity/SAc20LEjgpHIDrImcIh-bjX-HG8"},"name":"Tempeo Group Holdings Limited","natures_of_control":["ownership-of-shares-75-to-100-percent"],"notified_on":"2024-06-01"},"event":{"timepoint":11856669,"published_at":"2024-06-27T17:14:02","type":"changed"}}
{"resource_kind":"company-psc-individual","resource_uri":"/company/13462028/persons-with-significant-control/individual/09fnkDHmkkdA85GRgHpe_58ziNY","resource_id":"09fnkDHmkkdA85GRgHpe_58ziNY","data":{"address":{"address_line_1":"Peter Saunders","address_line_2":"148 Seamons Rd","country":"United Kingdom","locality":"Altrincham","postal_code":"WA14 4LJ","premises":"Mackinnon \u0026 Saunders Ltd","region":"Cheshire"},"country_of_residence":"England","date_of_birth":{"month":12,"year":
1985},"etag":"17f05f68b8a4c7e885b2d4fa8424355af40a288c","kind":"individual-person-with-significant-control","links":{"self":"/company/13462028/persons-with-significant-control/individual/09fnkDHmkkdA85GRgHpe_58ziNY"},"name":"Mr Miguel Nickardo Williams","name_elements":{"forename":"Miguel","surname":"Williams","title":"Mr"},"nationality":"Jamaican","natures_of_control":["ownership-of-shares-75-to-100-percent","voting-rights-75-to-100-percent","right-to-appoint-and-remove-directors"],"notified_on":"2021-06-17"},"event":{"timepoint":11856670,"published_at":"2024-06-27T17:14:02","type":"changed"}}

{"resource_kind":"company-psc-corporate","resource_uri":"/company/10590193/persons-with-significant-control/corporate-entity/F7aBDTQZDEokt1pQ9H0Gy1fkjoU","resource_id":"F7aBDTQZDEokt1pQ9H0Gy1fkjoU","data":{"address":{"address_line_1":"29 Wellington Street","country":"United Kingdom","locality":"Leeds","postal_code":"LS1 4DL","premises":"Central Square"},"etag":"84303b37189f7c327e1eb6573a7ad45bd31b8542","identification":{"country_registered":"United Kingdom","legal_authority":"United Kingdom","legal_form":"Limited By Shares","place_registered":"Companies House","registration_number":"13355267"},"kind":"corporate-entity-person-with-significant-control","links":{"self":"/company/10590193/persons-with-significant-control/corporate-entity/F7aBDTQZDEokt1pQ9H0Gy1fkjoU"},"name":"Talgarth Uk Holdco Ltd","natures_of_control":["ownership-of-shares-75-to-100-percent","voting-rights-75-to-100-percent","right-to-appoint-and-remove-directors"],"notified_on":"2022-10-21"},"event":{"timepoint":11856671,"published_at":"2024-06-27T17:14:02","type":"changed"}}
{"resource_kind":"company-psc-corporate","resource_uri":"/company/10590193/persons-with-significant-control/corporate-entity/F7aBDTQZDEokt1pQ9H0Gy1fkjoU","resource_id":"F7aBDTQZDEokt1pQ9H0Gy1fkjoU","data":{"address":{"address_line_1":"29 Wellington Street","country":"United Kingdom","locality":"Leeds","postal_code":"LS1 4DL","premises":"Central Square"},"etag":"84303b37189f7c3
27e1eb6573a7ad45bd31b8542","identification":{"country_registered":"United Kingdom","legal_authority":"United Kingdom","legal_form":"Limited By Shares","place_registered":"Companies House","registration_number":"13355267"},"kind":"corporate-entity-person-with-significant-control","links":{"self":"/company/10590193/persons-with-significant-control/corporate-entity/F7aBDTQZDEokt1pQ9H0Gy1fkjoU"},"name":"Talgarth Uk Holdco Ltd","natures_of_control":["ownership-of-shares-75-to-100-percent","voting-rights-75-to-100-percent","right-to-appoint-and-remove-directors"],"notified_on":"2022-10-21"},"event":{"timepoint":11856672,"published_at":"2024-06-27T17:14:02.337998+01:00","type":"changed"}}



{"resource_kind":"company-psc-corporate","resource_uri":"/company/14765796/persons-with-significant-control/corporate-entity/sqeY_OCZMzIzTA62cf50iUT7K7c","resource_id":"sqeY_OCZMzIzTA62cf50iUT7K7c","data":{"address":{"address_line_1":"1111b S Governors Ave Ste 3219","country":"United States","locality":"Dover","postal_code":"19904","premises":"1111b","region":"Delaware"},"ceased_on":"2024-03-19","etag":"c877215f898de83550fa588cc6f832f0f79a5173","identification":{"country_registered":"United States","legal_authority":"Delaware General Corporation Law","legal_form":"Delware C-Corporation","place_registered":"Division Of Corporations","registration_number":"3177301"},"kind":"corporate-entity-person-with-significant-control","links":{"self":"/company/14765796/persons-with-significant-control/corporate-entity/sqeY_OCZMzIzTA62cf50iUT7K7c"},"name":"Audiogen Inc","natures_of_control":["ownership-of-shares-75-to-100-percent","voting-rights-75-to-100-percent"],"notified_on":"2024-03-19"},"event":{"timepoint":11856673,"published_at":"2024-06-27T17:15:02.145435+01:00","type":"changed"}}

Timepoints: 11856669, 11856670, 11856671, 11856672, 11856673

@tiredpixel
Copy link
Contributor Author

tiredpixel commented Jun 27, 2024

At first, I thought this was because of incorrect handling of the PSC stream keepalives. PSC Streaming API documentation states:

Keeping a connection alive

HTTP connections will typically be timed out and disconnected by clients if there is no data transfer for a while. To avoid this and keep a connection alive when there is no new data to stream, the streaming API periodically sends an empty record as a heartbeat. These take the form of a blank line within the streamed feed, and must be ignored by client applications.

https://developer-specs.company-information.service.gov.uk/streaming-api/guides/overview

However, Register Common HTTP adapter code seems to handle this:
https://github.com/openownership/register-common/blob/c08dd811df9874e7d2509a423530e41c1aa7df4c/lib/register_common/adapters/http_adapter.rb#L36

Rather, the issue appears to be incorrect handling of whether or not a stream chunk ends in a newline. Consider the following:

bfr = "a"
p bfr.split("\n")
["a"]
bfr = "a\nb\nc"
p bfr.split("\n")
["a", "b", "c"]

That is, chunks containing inner newlines can be correctly separated from a partial record using split. In the latter scenario, a is a whole record, b is a whole record, but c is a partial record, which must be buffered until more data is received (perhaps it will turn into c0 as a whole record).

However, consider the case where a chunk ends with a newline:

bfr = "a\n"
p bfr.split("\n")
["a"]
bfr = "a\nb\nc\n"
p bfr.split("\n")
["a", "b", "c"]

That is, the difference between a partial and a complete final record (ignoring the zero or more complete records which might be in the buffer prior to this) is lost.

Since these are JSON payloads, this can lead to {"c":3} (complete final record) being parsed correctly, but {"c:3" (partial final record) followed by }{"d":4}" being interpreted as {"c":3}{"d":4} rather than {"c":3}\n{"d":4}". Somewhere along the way, a record gets lost.

However, note that there does appear to be code to handle this:
https://github.com/openownership/register-common/blob/c08dd811df9874e7d2509a423530e41c1aa7df4c/lib/register_common/adapters/http_adapter.rb#L27

In practice, however, this logic does not appear to be correct. I am not completely sure why, but perhaps it is something to do with when the newline heartbeats also occur. Certainly, during debugging the data coming from the HTTP stream directly, I found that sometimes the buffer would be a partial record (OK), no record would be emitted in that pass (OK), the buffer would be appended to be a complete record (OK), no record would be emitted in that pass (not OK), then the buffer would again be the complete record but still with no record emitted (not OK). This admittedly doesn't answer at which stage the buffer gets discarded, but certainly, something isn't working correctly.

There are a number of potential solutions, but perhaps the simplest is this:

Method: String#split

[…]
If the limit parameter is omitted, trailing null fields are suppressed. If limit is a positive number, at most that number of split substrings will be returned (captured groups will be returned as well, but are not counted towards the limit). If limit is 1, the entire string is returned as the only entry in an array. If negative, there is no limit to the number of fields returned, and trailing null fields are not suppressed.

https://www.rubydoc.info/stdlib/core/String:split

That is:

bfr = "a\nb\nc\n"
p bfr.split("\n")
["a", "b", "c"]
bfr = "a\nb\nc\n"
p bfr.split("\n", -1)
["a", "b", "c", ""]

This can be used to simplify the code significantly, and to split the buffer into head and tail, always taking the -1st element as the new buffer.

@tiredpixel
Copy link
Contributor Author

All the fixes have been merged and deployed. So far, so good: no gaps.

I intend to check in on it tomorrow, by which time, it definitely should have started having gaps if it still has a problem (since typically gaps appear within a couple of minutes or so).

@tiredpixel
Copy link
Contributor Author

Looks good. Events are still consecutive after running for around 16 hours, with no apparent gaps.

Taking a sample of 500 events:

270-cut.txt

11862989 - 11862490 + 1 = 500, so no events are missing in that sample.

I think this is fixed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

1 participant