diff --git a/tests/integration/table_truncated_err_test.go b/tests/integration/table_truncated_err_test.go index b4279a2c9..6ee72686f 100644 --- a/tests/integration/table_truncated_err_test.go +++ b/tests/integration/table_truncated_err_test.go @@ -21,167 +21,111 @@ import ( // https://github.com/ydb-platform/ydb-go-sdk/issues/798 func TestIssue798TruncatedError(t *testing.T) { const rowsLimit = 1000 - t.Run("TruncatedErrorOverTableService", func(t *testing.T) { - var ( - scope = newScope(t) - driver = scope.Driver(ydb.WithExecuteDataQueryOverQueryClient(false)) - db = scope.SQLDriver(ydb.WithQueryService(false)) - tablePath = scope.TablePath() - ) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // clear table - { - driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { - _, _, err := s.Execute(ctx, - table.DefaultTxControl(), - fmt.Sprintf("DELETE FROM `%s`;", tablePath), - nil, - ) - if err != nil { - return err - } + var ( + scope = newScope(t) + driver = scope.Driver(ydb.WithExecuteDataQueryOverQueryClient(false)) + db = scope.SQLDriver(ydb.WithQueryService(false)) + tablePath = scope.TablePath() + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // clear table + { + driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { + _, _, err := s.Execute(ctx, + table.DefaultTxControl(), + fmt.Sprintf("DELETE FROM `%s`;", tablePath), + nil, + ) + if err != nil { return err - }, table.WithIdempotent()) - } - - // upsert rows - { - rows := make([]types.Value, rowsLimit) - for i := range rows { - rows[i] = types.StructValue( - types.StructFieldValue("id", types.Int64Value(int64(i))), - types.StructFieldValue("val", types.TextValue(strconv.Itoa(i))), - ) } - err := driver.Table().BulkUpsert(ctx, tablePath, - table.BulkUpsertDataRows(types.ListValue(rows...)), table.WithIdempotent(), - ) - scope.Require.NoError(err) - } - - // select rows without truncated error - { - err := driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { - _, results, err := s.Execute(ctx, - table.DefaultTxControl(), - fmt.Sprintf("SELECT * FROM `%s`;", tablePath), - nil, - ) - if err != nil { - return err - } - if err = results.NextResultSetErr(ctx); err != nil { - return fmt.Errorf("no result sets: %w", err) - } - if results.CurrentResultSet().RowCount() != rowsLimit { - return fmt.Errorf("unexpected rows count: %d", results.CurrentResultSet().RowCount()) - } - return results.Err() - }, table.WithIdempotent()) - scope.Require.NoError(err) - - err = retry.Do(ctx, db, func(ctx context.Context, cc *sql.Conn) error { - rows, err := cc.QueryContext(ctx, fmt.Sprintf("SELECT * FROM `%s`;", tablePath)) - if err != nil { - return err - } - defer func() { - _ = rows.Close() - }() - count := 0 - for rows.Next() { - count++ - } - if count != rowsLimit { - return fmt.Errorf("unexpected rows count: %d", count) - } - - return rows.Err() - }, retry.WithIdempotent(true)) - scope.Require.NoError(err) - } - - // upsert 1 row for get 1001 rows and truncated error - { - err := driver.Table().BulkUpsert(ctx, tablePath, - table.BulkUpsertDataRows(types.ListValue(types.StructValue( - types.StructFieldValue("id", types.Int64Value(rowsLimit)), - types.StructFieldValue("val", types.TextValue(strconv.Itoa(rowsLimit))), - ))), - table.WithIdempotent(), + return err + }, table.WithIdempotent()) + } + + // upsert rows + { + rows := make([]types.Value, rowsLimit) + for i := range rows { + rows[i] = types.StructValue( + types.StructFieldValue("id", types.Int64Value(int64(i))), + types.StructFieldValue("val", types.TextValue(strconv.Itoa(i))), ) - scope.Require.NoError(err) } - - // select all rows with truncated result error - { - { - err := driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { - _, results, err := s.Execute(ctx, - table.DefaultTxControl(), - fmt.Sprintf("SELECT * FROM `%s`;", tablePath), - nil, - ) - if err != nil { - return err - } - if err = results.NextResultSetErr(ctx); err != nil { - return fmt.Errorf("no result sets: %w", err) - } - - rowsCount := results.CurrentResultSet().RowCount() - - if rowsCount != rowsLimit { - return fmt.Errorf("unexpected rows count: %d", rowsCount) - } - - return results.Err() // expected truncated error for execute data query using table client - }, table.WithIdempotent()) - scope.Require.ErrorIs(err, result.ErrTruncated) + err := driver.Table().BulkUpsert(ctx, tablePath, + table.BulkUpsertDataRows(types.ListValue(rows...)), table.WithIdempotent(), + ) + scope.Require.NoError(err) + } + + // select rows without truncated error + { + err := driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { + _, results, err := s.Execute(ctx, + table.DefaultTxControl(), + fmt.Sprintf("SELECT * FROM `%s`;", tablePath), + nil, + ) + if err != nil { + return err + } + if err = results.NextResultSetErr(ctx); err != nil { + return fmt.Errorf("no result sets: %w", err) } + if results.CurrentResultSet().RowCount() != rowsLimit { + return fmt.Errorf("unexpected rows count: %d", results.CurrentResultSet().RowCount()) + } + + return results.Err() + }, table.WithIdempotent()) + scope.Require.NoError(err) - { - var rowsCount int - err := retry.Do(ctx, db, func(ctx context.Context, cc *sql.Conn) error { - rowsCount = 0 - rows, err := cc.QueryContext(ctx, fmt.Sprintf("SELECT * FROM `%s`;", tablePath)) - if err != nil { - return err - } - defer func() { - _ = rows.Close() - }() - for rows.Next() { - rowsCount++ - } - - if err := rows.Err(); err != nil { - return err - } - - if rowsCount != rowsLimit { - return fmt.Errorf("unexpected rows count: %d", rowsCount) - } - - return nil - }, retry.WithIdempotent(true)) - scope.Require.ErrorIs(err, result.ErrTruncated) + err = retry.Do(ctx, db, func(ctx context.Context, cc *sql.Conn) error { + rows, err := cc.QueryContext(ctx, fmt.Sprintf("SELECT * FROM `%s`;", tablePath)) + if err != nil { + return err + } + defer func() { + _ = rows.Close() + }() + count := 0 + for rows.Next() { + count++ + } + if count != rowsLimit { + return fmt.Errorf("unexpected rows count: %d", count) } - } - // select all rows without truncated result error + return rows.Err() + }, retry.WithIdempotent(true)) + scope.Require.NoError(err) + } + + // upsert 1 row for get 1001 rows and truncated error + { + err := driver.Table().BulkUpsert(ctx, tablePath, + table.BulkUpsertDataRows(types.ListValue(types.StructValue( + types.StructFieldValue("id", types.Int64Value(rowsLimit)), + types.StructFieldValue("val", types.TextValue(strconv.Itoa(rowsLimit))), + ))), + table.WithIdempotent(), + ) + scope.Require.NoError(err) + } + + // select all rows with truncated result error + { { err := driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { _, results, err := s.Execute(ctx, table.DefaultTxControl(), fmt.Sprintf("SELECT * FROM `%s`;", tablePath), nil, - options.WithIgnoreTruncated(), ) if err != nil { return err @@ -189,41 +133,22 @@ func TestIssue798TruncatedError(t *testing.T) { if err = results.NextResultSetErr(ctx); err != nil { return fmt.Errorf("no result sets: %w", err) } - if rowsCount := results.CurrentResultSet().RowCount(); rowsCount != rowsLimit { + + rowsCount := results.CurrentResultSet().RowCount() + + if rowsCount != rowsLimit { return fmt.Errorf("unexpected rows count: %d", rowsCount) } - return nil + return results.Err() // expected truncated error for execute data query using table client }, table.WithIdempotent()) - scope.Require.NoError(err) + scope.Require.ErrorIs(err, result.ErrTruncated) } - // connect with default option ignore truncated without truncated result error { - driver, err := driver.With(ctx, ydb.WithIgnoreTruncated()) - scope.Require.NoError(err) - - err = driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { - _, results, err := s.Execute(ctx, - table.DefaultTxControl(), - fmt.Sprintf("SELECT * FROM `%s`;", tablePath), - nil, - ) - if err != nil { - return err - } - if err = results.NextResultSetErr(ctx); err != nil { - return fmt.Errorf("no result sets: %w", err) - } - if results.CurrentResultSet().RowCount() != rowsLimit { - return fmt.Errorf("unexpected rows count: %d", results.CurrentResultSet().RowCount()) - } - return results.Err() // expected nil - }, table.WithIdempotent()) - scope.Require.NoError(err) - - db = sql.OpenDB(ydb.MustConnector(driver)) - err = retry.Do(ctx, db, func(ctx context.Context, cc *sql.Conn) error { + var rowsCount int + err := retry.Do(ctx, db, func(ctx context.Context, cc *sql.Conn) error { + rowsCount = 0 rows, err := cc.QueryContext(ctx, fmt.Sprintf("SELECT * FROM `%s`;", tablePath)) if err != nil { return err @@ -231,165 +156,191 @@ func TestIssue798TruncatedError(t *testing.T) { defer func() { _ = rows.Close() }() - count := 0 for rows.Next() { - count++ + rowsCount++ } - return rows.Err() - }, retry.WithIdempotent(true)) - scope.Require.NoError(err) - } - }) - t.Run("NoTruncatedErrorOverQueryService", func(t *testing.T) { - t.Skip() - - var ( - scope = newScope(t) - driver = scope.Driver(ydb.WithExecuteDataQueryOverQueryClient(true)) - db = scope.SQLDriver(ydb.WithQueryService(true)) - tablePath = scope.TablePath() - ) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // clear table - { - driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { - _, _, err := s.Execute(ctx, - table.DefaultTxControl(), - fmt.Sprintf("DELETE FROM `%s`;", tablePath), - nil, - ) - if err != nil { + if err := rows.Err(); err != nil { return err } - return err - }, table.WithIdempotent()) - } - - // upsert rows - { - rows := make([]types.Value, rowsLimit) - for i := range rows { - rows[i] = types.StructValue( - types.StructFieldValue("id", types.Int64Value(int64(i))), - types.StructFieldValue("val", types.TextValue(strconv.Itoa(i))), - ) - } - err := driver.Table().BulkUpsert(ctx, tablePath, - table.BulkUpsertDataRows(types.ListValue(rows...)), table.WithIdempotent(), - ) - scope.Require.NoError(err) - } - - // select rows without truncated error - { - err := driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { - _, results, err := s.Execute(ctx, - table.DefaultTxControl(), - fmt.Sprintf("SELECT * FROM `%s`;", tablePath), - nil, - ) - if err != nil { - return err - } - if err = results.NextResultSetErr(ctx); err != nil { - return fmt.Errorf("no result sets: %w", err) - } - if results.CurrentResultSet().RowCount() != rowsLimit { - return fmt.Errorf("unexpected rows count: %d", results.CurrentResultSet().RowCount()) - } - - return results.Err() - }, table.WithIdempotent()) - scope.Require.NoError(err) - - err = retry.Do(ctx, db, func(ctx context.Context, cc *sql.Conn) error { - rows, err := cc.QueryContext(ctx, fmt.Sprintf("SELECT * FROM `%s`;", tablePath)) - if err != nil { - return err - } - defer func() { - _ = rows.Close() - }() - count := 0 - for rows.Next() { - count++ - } - if count != rowsLimit { - return fmt.Errorf("unexpected rows count: %d", count) + if rowsCount != rowsLimit { + return fmt.Errorf("unexpected rows count: %d", rowsCount) } - return rows.Err() + return nil }, retry.WithIdempotent(true)) - scope.Require.NoError(err) + scope.Require.ErrorIs(err, result.ErrTruncated) } - - // upsert 1 row for get 1001 rows and truncated error - { - err := driver.Table().BulkUpsert(ctx, tablePath, - table.BulkUpsertDataRows(types.ListValue(types.StructValue( - types.StructFieldValue("id", types.Int64Value(rowsLimit)), - types.StructFieldValue("val", types.TextValue(strconv.Itoa(rowsLimit))), - ))), - table.WithIdempotent(), + } + + // select all rows without truncated result error + { + err := driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { + _, results, err := s.Execute(ctx, + table.DefaultTxControl(), + fmt.Sprintf("SELECT * FROM `%s`;", tablePath), + nil, + options.WithIgnoreTruncated(), ) - scope.Require.NoError(err) - } + if err != nil { + return err + } + if err = results.NextResultSetErr(ctx); err != nil { + return fmt.Errorf("no result sets: %w", err) + } + if rowsCount := results.CurrentResultSet().RowCount(); rowsCount != rowsLimit { + return fmt.Errorf("unexpected rows count: %d", rowsCount) + } - // select all rows with truncated result error - { - { - err := driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { - _, results, err := s.Execute(ctx, - table.DefaultTxControl(), - fmt.Sprintf("SELECT * FROM `%s`;", tablePath), - nil, - ) - if err != nil { - return err - } - if err = results.NextResultSetErr(ctx); err != nil { - return fmt.Errorf("no result sets: %w", err) - } - - rowsCount := results.CurrentResultSet().RowCount() - if rowsCount != rowsLimit+1 { - return fmt.Errorf("unexpected rows count: %d", rowsCount) - } - - return results.Err() // expected truncated error for execute data query using table client - }, table.WithIdempotent()) - scope.Require.NoError(err) + return nil + }, table.WithIdempotent()) + scope.Require.NoError(err) + } + + // connect with default option ignore truncated without truncated result error + { + driver, err := driver.With(ctx, ydb.WithIgnoreTruncated()) + scope.Require.NoError(err) + + err = driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { + _, results, err := s.Execute(ctx, + table.DefaultTxControl(), + fmt.Sprintf("SELECT * FROM `%s`;", tablePath), + nil, + ) + if err != nil { + return err + } + if err = results.NextResultSetErr(ctx); err != nil { + return fmt.Errorf("no result sets: %w", err) + } + if results.CurrentResultSet().RowCount() != rowsLimit { + return fmt.Errorf("unexpected rows count: %d", results.CurrentResultSet().RowCount()) } + return results.Err() // expected nil + }, table.WithIdempotent()) + scope.Require.NoError(err) + + db = sql.OpenDB(ydb.MustConnector(driver)) + err = retry.Do(ctx, db, func(ctx context.Context, cc *sql.Conn) error { + rows, err := cc.QueryContext(ctx, fmt.Sprintf("SELECT * FROM `%s`;", tablePath)) + if err != nil { + return err + } + defer func() { + _ = rows.Close() + }() + count := 0 + for rows.Next() { + count++ + } + return rows.Err() + }, retry.WithIdempotent(true)) + scope.Require.NoError(err) + } +} - { - var rowsCount int - err := retry.Do(ctx, db, func(ctx context.Context, cc *sql.Conn) error { - rowsCount = 0 - rows, err := cc.QueryContext(ctx, fmt.Sprintf("SELECT * FROM `%s`;", tablePath)) - if err != nil { - return err - } - defer func() { - _ = rows.Close() - }() - for rows.Next() { - rowsCount++ - } - if rowsCount != rowsLimit+1 { - return fmt.Errorf("unexpected rows count: %d", rowsCount) - } - - return rows.Err() - }, retry.WithIdempotent(true)) - scope.Require.NoError(err) +func TestTestIssue798NoTruncatedErrorOverQueryService(t *testing.T) { + const rowsLimit = 1000 + var ( + scope = newScope(t) + driver = scope.Driver(ydb.WithExecuteDataQueryOverQueryClient(true)) + db = scope.SQLDriver(ydb.WithQueryService(true)) + tablePath = scope.TablePath() + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // clear table + { + driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { + _, _, err := s.Execute(ctx, + table.DefaultTxControl(), + fmt.Sprintf("DELETE FROM `%s`;", tablePath), + nil, + ) + if err != nil { + return err } + + return err + }, table.WithIdempotent()) + } + + // upsert rows + { + rows := make([]types.Value, rowsLimit) + for i := range rows { + rows[i] = types.StructValue( + types.StructFieldValue("id", types.Int64Value(int64(i))), + types.StructFieldValue("val", types.TextValue(strconv.Itoa(i))), + ) } + err := driver.Table().BulkUpsert(ctx, tablePath, + table.BulkUpsertDataRows(types.ListValue(rows...)), table.WithIdempotent(), + ) + scope.Require.NoError(err) + } + + // select rows without truncated error + { + err := driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { + _, results, err := s.Execute(ctx, + table.DefaultTxControl(), + fmt.Sprintf("SELECT * FROM `%s`;", tablePath), + nil, + ) + if err != nil { + return err + } + if err = results.NextResultSetErr(ctx); err != nil { + return fmt.Errorf("no result sets: %w", err) + } + if results.CurrentResultSet().RowCount() != rowsLimit { + return fmt.Errorf("unexpected rows count: %d", results.CurrentResultSet().RowCount()) + } - // select all rows without truncated result error + return results.Err() + }, table.WithIdempotent()) + scope.Require.NoError(err) + + err = retry.Do(ctx, db, func(ctx context.Context, cc *sql.Conn) error { + rows, err := cc.QueryContext(ctx, fmt.Sprintf("SELECT * FROM `%s`;", tablePath)) + if err != nil { + return err + } + defer func() { + _ = rows.Close() + }() + count := 0 + for rows.Next() { + count++ + } + if count != rowsLimit { + return fmt.Errorf("unexpected rows count: %d", count) + } + + return rows.Err() + }, retry.WithIdempotent(true)) + scope.Require.NoError(err) + } + + // upsert 1 row for get 1001 rows and truncated error + { + err := driver.Table().BulkUpsert(ctx, tablePath, + table.BulkUpsertDataRows(types.ListValue(types.StructValue( + types.StructFieldValue("id", types.Int64Value(rowsLimit)), + types.StructFieldValue("val", types.TextValue(strconv.Itoa(rowsLimit))), + ))), + table.WithIdempotent(), + ) + scope.Require.NoError(err) + } + + // select all rows with truncated result error + { { err := driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { _, results, err := s.Execute(ctx, @@ -403,40 +354,21 @@ func TestIssue798TruncatedError(t *testing.T) { if err = results.NextResultSetErr(ctx); err != nil { return fmt.Errorf("no result sets: %w", err) } - if results.CurrentResultSet().RowCount() != rowsLimit+1 { - return fmt.Errorf("unexpected rows count: %d", results.CurrentResultSet().RowCount()) + + rowsCount := results.CurrentResultSet().RowCount() + if rowsCount != rowsLimit+1 { + return fmt.Errorf("unexpected rows count: %d", rowsCount) } - return results.Err() // expected nil + + return results.Err() // expected truncated error for execute data query using table client }, table.WithIdempotent()) scope.Require.NoError(err) } - // connect with default option ignore truncated without truncated result error { - driver, err := driver.With(ctx, ydb.WithIgnoreTruncated()) - scope.Require.NoError(err) - - err = driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { - _, results, err := s.Execute(ctx, - table.DefaultTxControl(), - fmt.Sprintf("SELECT * FROM `%s`;", tablePath), - nil, - ) - if err != nil { - return err - } - if err = results.NextResultSetErr(ctx); err != nil { - return fmt.Errorf("no result sets: %w", err) - } - if results.CurrentResultSet().RowCount() != rowsLimit+1 { - return fmt.Errorf("unexpected rows count: %d", results.CurrentResultSet().RowCount()) - } - return results.Err() // expected nil - }, table.WithIdempotent()) - scope.Require.NoError(err) - - db = sql.OpenDB(ydb.MustConnector(driver)) - err = retry.Do(ctx, db, func(ctx context.Context, cc *sql.Conn) error { + var rowsCount int + err := retry.Do(ctx, db, func(ctx context.Context, cc *sql.Conn) error { + rowsCount = 0 rows, err := cc.QueryContext(ctx, fmt.Sprintf("SELECT * FROM `%s`;", tablePath)) if err != nil { return err @@ -444,13 +376,80 @@ func TestIssue798TruncatedError(t *testing.T) { defer func() { _ = rows.Close() }() - count := 0 for rows.Next() { - count++ + rowsCount++ } + if rowsCount != rowsLimit+1 { + return fmt.Errorf("unexpected rows count: %d", rowsCount) + } + return rows.Err() }, retry.WithIdempotent(true)) scope.Require.NoError(err) } - }) + } + + // select all rows without truncated result error + { + err := driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { + _, results, err := s.Execute(ctx, + table.DefaultTxControl(), + fmt.Sprintf("SELECT * FROM `%s`;", tablePath), + nil, + ) + if err != nil { + return err + } + if err = results.NextResultSetErr(ctx); err != nil { + return fmt.Errorf("no result sets: %w", err) + } + if results.CurrentResultSet().RowCount() != rowsLimit+1 { + return fmt.Errorf("unexpected rows count: %d", results.CurrentResultSet().RowCount()) + } + return results.Err() // expected nil + }, table.WithIdempotent()) + scope.Require.NoError(err) + } + + // connect with default option ignore truncated without truncated result error + { + driver, err := driver.With(ctx, ydb.WithIgnoreTruncated()) + scope.Require.NoError(err) + + err = driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { + _, results, err := s.Execute(ctx, + table.DefaultTxControl(), + fmt.Sprintf("SELECT * FROM `%s`;", tablePath), + nil, + ) + if err != nil { + return err + } + if err = results.NextResultSetErr(ctx); err != nil { + return fmt.Errorf("no result sets: %w", err) + } + if results.CurrentResultSet().RowCount() != rowsLimit+1 { + return fmt.Errorf("unexpected rows count: %d", results.CurrentResultSet().RowCount()) + } + return results.Err() // expected nil + }, table.WithIdempotent()) + scope.Require.NoError(err) + + db = sql.OpenDB(ydb.MustConnector(driver)) + err = retry.Do(ctx, db, func(ctx context.Context, cc *sql.Conn) error { + rows, err := cc.QueryContext(ctx, fmt.Sprintf("SELECT * FROM `%s`;", tablePath)) + if err != nil { + return err + } + defer func() { + _ = rows.Close() + }() + count := 0 + for rows.Next() { + count++ + } + return rows.Err() + }, retry.WithIdempotent(true)) + scope.Require.NoError(err) + } }