Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event streamer tries to read from dropped chunks #206

Open
vidosits opened this issue Feb 7, 2024 · 5 comments · May be fixed by #207
Open

Event streamer tries to read from dropped chunks #206

vidosits opened this issue Feb 7, 2024 · 5 comments · May be fixed by #207

Comments

@vidosits
Copy link
Contributor

vidosits commented Feb 7, 2024

It seems to me that event streamer is trying to read from / work with dropped hypertable chunks, which of course it can't find.

CDC (Change Data Capture) for TimescaleDB Hypertable
timescaledb-event-streamer version 0.12.1-dev (git revision unknown; branch unknown)
Loading configuration file: config.toml
[2024/02/07T15:35:33.391] [INFO] [SystemCatalog] Selected tables for replication:  
[2024/02/07T15:35:33.391] [INFO] [SystemCatalog]   * [REDACTED] (type: Hypertable, replica identity: DEFAULT)  
[2024/02/07T15:35:33.391] [INFO] [Replicator] Discovered System Information:  
[2024/02/07T15:35:33.392] [INFO] [Replicator]   * PostgreSQL version 15.3  
[2024/02/07T15:35:33.392] [INFO] [Replicator]   * TimescaleDB version 2.10.3  
[2024/02/07T15:35:33.392] [INFO] [Replicator]   * PostgreSQL System Identity 7312038712204218402  
[2024/02/07T15:35:33.392] [INFO] [Replicator]   * PostgreSQL Timeline 13  
[2024/02/07T15:35:33.392] [INFO] [Replicator]   * PostgreSQL DatabaseName smartcitydb  
[2024/02/07T15:35:33.392] [INFO] [Replicator]   * PostgreSQL Types loaded 597  
[2024/02/07T15:35:33.392] [INFO] [FileStateStorage] Starting FileStateStorage at /tmp/statestorage.dat  
[2024/02/07T15:35:33.392] [INFO] [FileStateStorage] Loading FileStateStorage at /tmp/statestorage.dat  
[2024/02/07T15:35:35.285] [INFO] [ReplicationConnection] SystemId: 7312038712204218402, Timeline: 13, XLogPos: C0C/DAE6FC28, DatabaseName: [Redacted]  
[2024/02/07T15:35:37.294] [INFO] [ReplicationChannel] Reused replication slot: event_streamer_test  
[2024/02/07T15:35:38.154] [INFO] [FileStateStorage] Stopping FileStateStorage at /tmp/statestorage.dat  
[2024/02/07T15:35:38.154] [INFO] [FileStateStorage] Storing FileStateStorage at /tmp/statestorage.dat  
[2024/02/07T15:35:38.154] [INFO] [TaskDispatcher] TaskManager shutting down  
ERROR: relation "_timescaledb_internal._hyper_1_7_chunk" does not exist (SQLSTATE 42P01)

Checking chunks in timescaledb catalog:

select * from _timescaledb_catalog.chunk where hypertable_id = 1;
id hypertable_id schema_name table_name compressed_chunk_id dropped status osm_chunk
7 1 _timescaledb_internal _hyper_1_7_chunk 1 0 0
8 1 _timescaledb_internal _hyper_1_8_chunk 1 0 0
10 1 _timescaledb_internal _hyper_1_10_chunk 0 0 0
11 1 _timescaledb_internal _hyper_1_11_chunk 0 0 0

Checking the tables in _timescaledb_internal schema:

SELECT table_name FROM information_schema.tables WHERE table_schema='_timescaledb_internal' AND table_type='BASE TABLE' order by table_name asc;
table_name
_hyper_1_10_chunk
_hyper_1_11_chunk
_hyper_2_2_chunk
_hyper_2_9_chunk
_materialized_hypertable_2
bgw_job_stat
bgw_policy_chunk_stats
job_errors

My best idea for fixing this would be to:

  • either not return dropped chunks in the selected hypertable when GetAllChunks() is called here
  • filter the list of returned chunks somewhere around here, because we're filtering them anyway at that point

does either of these make sense?

I've used the following patch and it worked, but I'm wondering if this is the right thing to do.

diff --git a/internal/systemcatalog/systemcatalog.go b/internal/systemcatalog/systemcatalog.go
index 6aea5ac..1655449 100644
--- a/internal/systemcatalog/systemcatalog.go
+++ b/internal/systemcatalog/systemcatalog.go
@@ -375,7 +375,9 @@ func (sc *systemCatalog) GetAllChunks() []systemcatalog.SystemEntity {
 	chunkTables := make([]systemcatalog.SystemEntity, 0)
 	for _, chunk := range sc.chunks {
 		if sc.IsHypertableSelectedForReplication(chunk.HypertableId()) {
-			chunkTables = append(chunkTables, chunk)
+			if !chunk.Dropped() {
+				chunkTables = append(chunkTables, chunk)
+			}
 		}
 	}
 	return chunkTables
@noctarius
Copy link
Owner

Hey @vidosits!

Thanks for the report. I think the change is perfectly fine. It'd be awesome if you can send a pull request for it. 🙏
If you feel ok adding a unit test for it, feel free, otherwise I'll do it 👍

@vidosits
Copy link
Contributor Author

vidosits commented Feb 7, 2024

Thanks, I tried with the test, but I only got like this far before having had to work on work stuff :(

timescaledb-event-streamer/internal/systemcatalog/systemcatalog_test.go

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package systemcatalog

import (
	"github.com/jackc/pgx/v5"
	"github.com/noctarius/timescaledb-event-streamer/spi/config"
	"github.com/noctarius/timescaledb-event-streamer/spi/pgtypes"
	"github.com/noctarius/timescaledb-event-streamer/spi/systemcatalog"
	"github.com/stretchr/testify/assert"
	"testing"
)

func Test_Dropped_Chunks_Are_Excluded(
	t *testing.T,
) {
	var userConfig pgx.ConnConfig
	var testConfig config.Config

	systemCatalog, err := NewSystemCatalog(&testConfig, &userConfig, nil, nil, nil, nil, nil, nil)
	if err != nil {
		t.Fatalf("Couldn't create system catalog: %+v", err)
	}

	for i := 0; i < 10; i++ {
		systemcatalog.NewChunk(.. .. .., dropped = True),
	}

	chunks := systemCatalog.GetAllChunks()
	assert.Equal(t, false, chunks == nil)
}

func makeHypertable(
	id int32, schemaName, tableName string,
) *systemcatalog.Hypertable {

	return systemcatalog.NewHypertable(
		id,
		schemaName,
		tableName,
		"test",
		"test",
		nil,
		0,
		false,
		nil,
		nil,
		pgtypes.DEFAULT,
	)
}

@noctarius
Copy link
Owner

That's fine. I'll add the test. You took the "unit test" a bit strikter than I expected. The are very little real unit tests, since most tests require the database to be available. They're more like integration tests. There's a whole construct around those things.

Anyway, for a bit more understanding, it is possible to mock out quite a few things to retrieve data, but it's gonna be complicated and you'd have to mock all of the actual database access layer. It's easier to just build the test like this one. You will have to start and stop the streamer, and you need to make sure you have the state persistence working for the test.

But as I said, happy to build the test if you send a PR 👍

@vidosits
Copy link
Contributor Author

vidosits commented Feb 7, 2024

Thanks! I did send #207 .

RE: Tests, you're of course completely right, I just didn't know how strictly you meant unit test :)

I was gonna go with something like this initally:

func (pts *PublicationTestSuite) Test_Dropped_Chunks_Are_Not_Processed() {
	testSink := testsupport.NewEventCollectorSink()

	publicationName := lo.RandomString(10, lo.LowerCaseLettersCharset)

	var tableName string
	chunksNotToBeProcessed := make([]string, 0)

	pts.RunTest(
		func(ctx testrunner.Context) error {
			existingChunks, publishedChunks, err := readAllAndPublishedChunks(ctx, tableName, publicationName)

			if err != nil {
				return err
			}

			*** somewhere around here we should be testing if the streamer has picked up dropped chunks or not ***
			return nil
		},

		testrunner.WithSetup(func(ctx testrunner.SetupContext) error {
			_, tn, err := ctx.CreateHypertable("ts", time.Hour,
				testsupport.NewColumn("ts", "timestamptz", false, true, nil),
				testsupport.NewColumn("val", "integer", false, false, nil),
			)
			if err != nil {
				return err
			}
			tableName = tn

			if _, err := ctx.Exec(context.Background(),
				fmt.Sprintf(
					"INSERT INTO \"%s\" SELECT ts, ROW_NUMBER() OVER (ORDER BY ts) AS val FROM GENERATE_SERIES('2023-03-25 00:00:00'::TIMESTAMPTZ, '2023-03-25 23:59:59'::TIMESTAMPTZ, INTERVAL '1 minute') t(ts)",
					tableName,
				),
			); err != nil {
				return err
			}

			if droppedChunks, err := ctx.Query(context.Background(),
				fmt.Sprintf(
					"SELECT drop_chunks('%s', '2023-03-25 12:00:00'::TIMESTAMPTZ)",
					tableName,
				),
			); err != nil {
				return err
			} else {
				for droppedChunks.Next() {
					var chunkName string
					if err := droppedChunks.Scan(&chunkName); err != nil {
						return nil
					}
					chunksNotToBeProcessed = append(chunksNotToBeProcessed, chunkName)
				}
			}

			ctx.AddSystemConfigConfigurator(testSink.SystemConfigConfigurator)
			ctx.AddSystemConfigConfigurator(func(config *sysconfig.SystemConfig) {
				config.PostgreSQL.Publication.Name = publicationName
			})
			return nil
		}),
	)
}

but then I was like, wait does this go into replicator tests or somewhere else? oh maybe I just just test the func or whatever

anyways, I'll take a look once you've done it and then be more productive the next time :)

@noctarius
Copy link
Owner

The looks like a good start. You'd have to get the replicator started once, so that the publication and everything is set up and the persistence state file knows about the existing chunks. At that point you'd suspend the replicator, which will shut it down, you drop the chunk, and resume (or restart) the replicator. That should now fail, do to the state file knowing about a chunk which doesn't exist anymore (or doesn't work anymore).

At least that's my guess about what happens, but maybe that a second case which may fail 🫣

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants