Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Doris Source When the parallelism is set to greater than 1, the read data will be lost #503

Closed
2 of 3 tasks
Misaki030112 opened this issue Oct 24, 2024 · 5 comments
Closed
2 of 3 tasks

Comments

@Misaki030112
Copy link

Misaki030112 commented Oct 24, 2024

Search before asking

  • I had searched in the issues and found no similar issues.

Version

24.0.1

What's Wrong?

I use flink-doris-connector to read the full data of a table, and then write it to kafka through flink kafka sink

I found that if I set the parallelism level to be greater than 1 for the Source, It will occasionally lose data, and sometimes only read a small portion of the data.

The following are screenshots of my experiment
PS: The number of rows of my data is 6405008
image

  1. When I set my Source parallelism to 6, it only reads a very small portion of the data
    image

  2. When I set my Source parallelism to 2, the result is the same
    image

  3. It seems that only when the parallelism is 1 can he read the complete data. Why is this?
    image

What You Expected?

I expect that when the Source parallelism is greater than 1, it should be able to read the complete data.

I looked at the Doris connector code carefully. I guess the process of assigning splits to read each split is fine. The problem is reading the data inside a DorisSplitRecords. It seems that it closes before reading all the tablets data in a split.

How to Reproduce?

flink version : 1.19-scala_2.12-java11
flink-doris-connector version: flink-doris-connector-1.18:24.0.1

flink task code

public class DorisToKafka {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        env.enableCheckpointing(10000);


        DorisSource<List<?>> dorisSource = DorisSource.<List<?>>builder()
                .setDorisReadOptions(
                        DorisReadOptions.builder()
                                .setRequestTabletSize(50)
                                .setRequestRetries(10)
                                .setDeserializeArrowAsync(Boolean.TRUE)
                                .setRequestReadTimeoutMs(60 * 1000)
                                .setRequestBatchSize(4096)
                                .build()
                )
                .setDorisOptions(
                        DorisOptions.builder()
                                .setFenodes("doris-fe-01:8030")
                                .setTableIdentifier("DB01.nyc_taxi_fare_data")
                                .setUsername("root")
                                .setPassword("")
                                .build()
                )
                .setDeserializer(
                        new SimpleListDeserializationSchema()
                )
                .build();

        DataStreamSource<List<?>> dorisDataStream = env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris-source");
        dorisDataStream.setParallelism(1);

        KafkaSink<List<?>> kafkaSink = KafkaSink.<List<?>>builder()
                .setBootstrapServers("kafka-broker-1:9092")
                .setRecordSerializer(new KafkaRecordSerializationSchema<List<?>>() {
                    @Override
                    public ProducerRecord<byte[], byte[]> serialize(List<?> rowData, KafkaSinkContext kafkaSinkContext, Long aLong) {
                        try {
                            String jsonString = null;
                            jsonString = ObjectMapperSingleton.getInstance().writeValueAsString(rowData);
                            byte[] value = jsonString.getBytes();
                            return new ProducerRecord<>("doris-kafka-topic", null, value);
                        } catch (JsonProcessingException e) {
                            e.printStackTrace();
                            throw new RuntimeException(e);
                        }
                    }
                })
                .build();


        dorisDataStream.sinkTo(kafkaSink).setParallelism(4);
        env.execute("doris-to-kafka");
    }
}

My doris table creation statement

CREATE TABLE DB01.nyc_taxi_fare_data(
  VendorID SMALLINT NULL COMMENT "A unique identifier for the taxi vendor or service provider.",
  tpep_pickup_datetime DATE NOT NULL COMMENT "The date and time when the passenger was picked up.",
  tpep_dropoff_datetime DATE NOT NULL COMMENT "The date and time when the passenger was dropped off.",
  passenger_count TINYINT COMMENT "The number of passengers in the taxi.",
  trip_distance FLOAT COMMENT "The total distance of the trip in miles or kilometers.",
  RatecodeID TINYINT COMMENT "The rate code assigned to the trip, representing fare types.",
  store_and_fwd_flag CHAR(1) COMMENT "Indicates whether the trip data was stored locally and then forwarded later (Y/N).",
  PULocationID SMALLINT COMMENT "The unique identifier for the pickup location (zone or area).",
  DOLocationID SMALLINT COMMENT "The unique identifier for the drop-off location (zone or area).",
  payment_type TINYINT COMMENT "The method of payment used by the passenger (e.g., cash, card).",
  fare_amount DOUBLE COMMENT "The base fare for the trip.",
  extra SMALLINT COMMENT "Additional charges applied during the trip (e.g., night surcharge).",
  mta_tax FLOAT COMMENT "The tax imposed by the Metropolitan Transportation Authority.",
  tip_amount SMALLINT COMMENT "The tip given to the driver, if applicable.",
  tolls_amount FLOAT COMMENT "The total amount of tolls charged during the trip.",
  improvement_surcharge FLOAT COMMENT "A surcharge imposed for the improvement of services.",
  total_amount DOUBLE COMMENT "The total fare amount, including all charges and surcharges.",
  congestion_surcharge FLOAT COMMENT "An additional charge for trips taken during high traffic congestion times."
) 
PARTITION BY RANGE(tpep_pickup_datetime) (
  FROM ("2003-01-01") TO ("2020-01-01") INTERVAL 1 YEAR,
  FROM ("2020-01-01") TO ("2021-01-03") INTERVAL 1 MONTH
)
DISTRIBUTED BY HASH(PULocationID) BUCKETS 265
PROPERTIES (
    "replication_num" = "1"
);

Anything Else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@JNSimba
Copy link
Member

JNSimba commented Oct 25, 2024

What is the Doris version? Does taskmanager report an error?
This PR #502 fixes the data loss caused by error retry during reading

@Misaki030112
Copy link
Author

Misaki030112 commented Oct 25, 2024

What is the Doris version? Does taskmanager report an error? This PR #502 fixes the data loss caused by error retry during reading

Doris Version: 3.0.1
According to the official document, start the image with docker compose, 1 fe node and 3 be nodes


Flink's TaskManager does not have any errors
image

image

I don't see any error logs related to flink-doris-connector. In my previous tests, I saw some warning logs with the content "The status of open scanner result from ......." These logs do not seem to have any impact on the code, but it seems that the read operation is closed immediately after the split is allocated, and very little data is actually read.

@Misaki030112
Copy link
Author

What is the Doris version? Does taskmanager report an error? This PR #502 fixes the data loss caused by error retry during reading

This PR #502 doesn't seem to solve my problem.
It seems that it just exposes the exception that data cannot be read after retrying and reduces redundant logs.

This is the task running result of recompiling flink-doris-connector after merging this PR

image

tm log (Section About Source Section)

2024-10-25 03:37:31,807 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: doris-source (1/6)#0 (0f33b5b7444e11245dcb129a5cb9e6b6_bc764cd8ddf7a0cff126f51c16239658_0_0) switched from DEPLOYING to INITIALIZING.
2024-10-25 03:37:31,807 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: doris-source (2/6)#0 (0f33b5b7444e11245dcb129a5cb9e6b6_bc764cd8ddf7a0cff126f51c16239658_1_0) switched from DEPLOYING to INITIALIZING.
2024-10-25 03:37:32,090 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: doris-source (2/6)#0 (0f33b5b7444e11245dcb129a5cb9e6b6_bc764cd8ddf7a0cff126f51c16239658_1_0) switched from INITIALIZING to RUNNING.
2024-10-25 03:37:32,090 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: doris-source (1/6)#0 (0f33b5b7444e11245dcb129a5cb9e6b6_bc764cd8ddf7a0cff126f51c16239658_0_0) switched from INITIALIZING to RUNNING.
2024-10-25 03:37:32,107 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-01:9060_59,be=doris-be-01:9060,tablets=[138754, 136704, 138752, 138758, 136708, 138756, 136714, 130570, 138766, 130572, 138770, 130576, 138768, 130578, 136726, 130580, 136724, 136732, 138780, 130590, 136738, 138786, 130592, 138784, 130594, 136742, 130596, 138788, 130598, 136746, 138794, 138712, 136668, 136672, 138720, 136676, 138724, 130534, 130536, 130538, 138734, 130540, 138738, 138736, 136694, 136692, 136696, 130554, 130556, 136700]]
2024-10-25 03:37:32,107 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-01:9060_60,be=doris-be-01:9060,tablets=[130600, 136750, 130604, 130606, 136754, 138800, 136762, 136760, 130618, 138814, 136766, 136764, 138818, 136770, 130624, 136774, 138820, 130630, 130632, 136780, 138832, 136784, 130642, 138836, 130646, 138842, 136792, 130650, 138846, 136798, 130652, 138844, 136796, 138850, 138848, 136800, 138854, 130660, 136804, 130662, 136810, 130664, 136808, 138860, 130670, 136818, 130672, 138864, 130676, 138872]]
2024-10-25 03:37:32,109 INFO  org.apache.doris.flink.source.reader.DorisSourceReader       [] - Initialized reader state for split: DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-01:9060_59,be=doris-be-01:9060,tablets=[138754, 136704, 138752, 138758, 136708, 138756, 136714, 130570, 138766, 130572, 138770, 130576, 138768, 130578, 136726, 130580, 136724, 136732, 138780, 130590, 136738, 138786, 130592, 138784, 130594, 136742, 130596, 138788, 130598, 136746, 138794, 138712, 136668, 136672, 138720, 136676, 138724, 130534, 130536, 130538, 138734, 130540, 138738, 138736, 136694, 136692, 136696, 130554, 130556, 136700]
2024-10-25 03:37:32,109 INFO  org.apache.doris.flink.source.reader.DorisSourceReader       [] - Initialized reader state for split: DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-01:9060_60,be=doris-be-01:9060,tablets=[130600, 136750, 130604, 130606, 136754, 138800, 136762, 136760, 130618, 138814, 136766, 136764, 138818, 136770, 130624, 136774, 138820, 130630, 130632, 136780, 138832, 136784, 130642, 138836, 130646, 138842, 136792, 130650, 138846, 136798, 130652, 138844, 136796, 138850, 138848, 136800, 138854, 130660, 136804, 130662, 136810, 130664, 136808, 138860, 130670, 136818, 130672, 138864, 130676, 138872]
2024-10-25 03:37:32,115 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Starting split fetcher 0
2024-10-25 03:37:32,115 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Starting split fetcher 0
2024-10-25 03:37:32,117 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Handling split change SplitAddition:[[DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-01:9060_60,be=doris-be-01:9060,tablets=[130600, 136750, 130604, 130606, 136754, 138800, 136762, 136760, 130618, 138814, 136766, 136764, 138818, 136770, 130624, 136774, 138820, 130630, 130632, 136780, 138832, 136784, 130642, 138836, 130646, 138842, 136792, 130650, 138846, 136798, 130652, 138844, 136796, 138850, 138848, 136800, 138854, 130660, 136804, 130662, 136810, 130664, 136808, 138860, 130670, 136818, 130672, 138864, 130676, 138872]]]
2024-10-25 03:37:32,117 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Handling split change SplitAddition:[[DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-01:9060_59,be=doris-be-01:9060,tablets=[138754, 136704, 138752, 138758, 136708, 138756, 136714, 130570, 138766, 130572, 138770, 130576, 138768, 130578, 136726, 130580, 136724, 136732, 138780, 130590, 136738, 138786, 130592, 138784, 130594, 136742, 130596, 138788, 130598, 136746, 138794, 138712, 136668, 136672, 138720, 136676, 138724, 130534, 130536, 130538, 138734, 130540, 138738, 138736, 136694, 136692, 136696, 130554, 130556, 136700]]]
2024-10-25 03:37:32,119 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Fetch a new split DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-01:9060_59,be=doris-be-01:9060,tablets=[138754, 136704, 138752, 138758, 136708, 138756, 136714, 130570, 138766, 130572, 138770, 130576, 138768, 130578, 136726, 130580, 136724, 136732, 138780, 130590, 136738, 138786, 130592, 138784, 130594, 136742, 130596, 138788, 130598, 136746, 138794, 138712, 136668, 136672, 138720, 136676, 138724, 130534, 130536, 130538, 138734, 130540, 138738, 138736, 136694, 136692, 136696, 130554, 130556, 136700]
2024-10-25 03:37:32,122 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - create reader for partition: PartitionDefinition{database='DB01', table='nyc_taxi_fare_data', beAddress='doris-be-01:9060', tabletIds=[138754, 136704, 138752, 138758, 136708, 138756, 136714, 130570, 138766, 130572, 138770, 130576, 138768, 130578, 136726, 130580, 136724, 136732, 138780, 130590, 136738, 138786, 130592, 138784, 130594, 136742, 130596, 138788, 130598, 136746, 138794, 138712, 136668, 136672, 138720, 136676, 138724, 130534, 130536, 130538, 138734, 130540, 138738, 138736, 136694, 136692, 136696, 130554, 130556, 136700]}
2024-10-25 03:37:32,119 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Fetch a new split DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-01:9060_60,be=doris-be-01:9060,tablets=[130600, 136750, 130604, 130606, 136754, 138800, 136762, 136760, 130618, 138814, 136766, 136764, 138818, 136770, 130624, 136774, 138820, 130630, 130632, 136780, 138832, 136784, 130642, 138836, 130646, 138842, 136792, 130650, 138846, 136798, 130652, 138844, 136796, 138850, 138848, 136800, 138854, 130660, 136804, 130662, 136810, 130664, 136808, 138860, 130670, 136818, 130672, 138864, 130676, 138872]
2024-10-25 03:37:32,122 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - create reader for partition: PartitionDefinition{database='DB01', table='nyc_taxi_fare_data', beAddress='doris-be-01:9060', tabletIds=[130600, 136750, 130604, 130606, 136754, 138800, 136762, 136760, 130618, 138814, 136766, 136764, 138818, 136770, 130624, 136774, 138820, 130630, 130632, 136780, 138832, 136784, 130642, 138836, 130646, 138842, 136792, 130650, 138846, 136798, 130652, 138844, 136796, 138850, 138848, 136800, 138854, 130660, 136804, 130662, 136810, 130664, 136808, 138860, 130670, 136818, 130672, 138864, 130676, 138872]}
2024-10-25 03:37:32,138 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Success connect to Doris BE{host='doris-be-01', port=9060}.
2024-10-25 03:37:32,138 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Success connect to Doris BE{host='doris-be-01', port=9060}.
2024-10-25 03:37:32,196 INFO  org.apache.doris.flink.backend.BackendClient                 [] - OpenScanner success for Doris BE 'Doris BE{host='doris-be-01', port=9060}' with contextId '74ec41a5-d864-436b-a340-3f000d13c59a' for tablets '[130600, 136750, 130604, 130606, 136754, 138800, 136762, 136760, 130618, 138814, 136766, 136764, 138818, 136770, 130624, 136774, 138820, 130630, 130632, 136780, 138832, 136784, 130642, 138836, 130646, 138842, 136792, 130650, 138846, 136798, 130652, 138844, 136796, 138850, 138848, 136800, 138854, 130660, 136804, 130662, 136810, 130664, 136808, 138860, 130670, 136818, 130672, 138864, 130676, 138872]'.
2024-10-25 03:37:32,197 INFO  org.apache.doris.flink.backend.BackendClient                 [] - OpenScanner success for Doris BE 'Doris BE{host='doris-be-01', port=9060}' with contextId '4de7bc9e-63d9-4981-87bf-f4e7f5b084b9' for tablets '[138754, 136704, 138752, 138758, 136708, 138756, 136714, 130570, 138766, 130572, 138770, 130576, 138768, 130578, 136726, 130580, 136724, 136732, 138780, 130590, 136738, 138786, 130592, 138784, 130594, 136742, 130596, 138788, 130598, 136746, 138794, 138712, 136668, 136672, 138720, 136676, 138724, 130534, 130536, 130538, 138734, 130540, 138738, 138736, 136694, 136692, 136696, 130554, 130556, 136700]'.
2024-10-25 03:37:32,228 INFO  org.apache.doris.shaded.org.apache.arrow.memory.BaseAllocator [] - Debug mode disabled. Enable with the VM option -Darrow.memory.debug.allocator=true.
2024-10-25 03:37:32,232 INFO  org.apache.doris.shaded.org.apache.arrow.memory.DefaultAllocationManagerOption [] - allocation manager type not specified, using netty as the default type
2024-10-25 03:37:32,233 INFO  org.apache.doris.shaded.org.apache.arrow.memory.CheckAllocator [] - Using DefaultAllocationManager at memory/DefaultAllocationManagerFactory.class
2024-10-25 03:37:33,642 INFO  org.apache.doris.flink.source.reader.DorisValueReader        [] - Scan finished, tablets: [130600, 136750, 130604, 130606, 136754, 138800, 136762, 136760, 130618, 138814, 136766, 136764, 138818, 136770, 130624, 136774, 138820, 130630, 130632, 136780, 138832, 136784, 130642, 138836, 130646, 138842, 136792, 130650, 138846, 136798, 130652, 138844, 136796, 138850, 138848, 136800, 138854, 130660, 136804, 130662, 136810, 130664, 136808, 138860, 130670, 136818, 130672, 138864, 130676, 138872], offset: 1648
2024-10-25 03:37:33,655 INFO  org.apache.doris.flink.backend.BackendClient                 [] - CloseScanner to Doris BE 'Doris BE{host='doris-be-01', port=9060}' success for contextId 74ec41a5-d864-436b-a340-3f000d13c59a 
2024-10-25 03:37:33,656 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Closed a connection to Doris BE{host='doris-be-01', port=9060}.
2024-10-25 03:37:33,658 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished reading split(s) [doris-be-01:9060_60]
2024-10-25 03:37:33,662 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished reading from splits [doris-be-01:9060_60]
2024-10-25 03:37:33,663 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Closing splitFetcher 0 because it is idle.
2024-10-25 03:37:33,664 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Shutting down split fetcher 0
2024-10-25 03:37:33,664 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split fetcher 0 exited.
2024-10-25 03:37:33,683 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-01:9060_51,be=doris-be-01:9060,tablets=[136096, 136102, 140198, 136100, 129958, 140202, 136106, 136104, 140206, 136110, 140204, 138162, 136114, 140212, 136122, 136120, 129978, 140222, 129982, 140226, 138178, 136130, 138176, 136128, 138182, 129988, 129990, 129992, 140232, 138184, 136136, 136142, 129996, 140236, 136140, 130000, 140246, 138198, 138196, 130006, 140250, 136154, 130008, 140248, 138200, 136152, 138204, 138210, 136162, 130016]]
2024-10-25 03:37:33,684 INFO  org.apache.doris.flink.source.reader.DorisSourceReader       [] - Initialized reader state for split: DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-01:9060_51,be=doris-be-01:9060,tablets=[136096, 136102, 140198, 136100, 129958, 140202, 136106, 136104, 140206, 136110, 140204, 138162, 136114, 140212, 136122, 136120, 129978, 140222, 129982, 140226, 138178, 136130, 138176, 136128, 138182, 129988, 129990, 129992, 140232, 138184, 136136, 136142, 129996, 140236, 136140, 130000, 140246, 138198, 138196, 130006, 140250, 136154, 130008, 140248, 138200, 136152, 138204, 138210, 136162, 130016]
2024-10-25 03:37:33,687 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Starting split fetcher 1
2024-10-25 03:37:33,688 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Handling split change SplitAddition:[[DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-01:9060_51,be=doris-be-01:9060,tablets=[136096, 136102, 140198, 136100, 129958, 140202, 136106, 136104, 140206, 136110, 140204, 138162, 136114, 140212, 136122, 136120, 129978, 140222, 129982, 140226, 138178, 136130, 138176, 136128, 138182, 129988, 129990, 129992, 140232, 138184, 136136, 136142, 129996, 140236, 136140, 130000, 140246, 138198, 138196, 130006, 140250, 136154, 130008, 140248, 138200, 136152, 138204, 138210, 136162, 130016]]]
2024-10-25 03:37:33,693 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Fetch a new split DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-01:9060_51,be=doris-be-01:9060,tablets=[136096, 136102, 140198, 136100, 129958, 140202, 136106, 136104, 140206, 136110, 140204, 138162, 136114, 140212, 136122, 136120, 129978, 140222, 129982, 140226, 138178, 136130, 138176, 136128, 138182, 129988, 129990, 129992, 140232, 138184, 136136, 136142, 129996, 140236, 136140, 130000, 140246, 138198, 138196, 130006, 140250, 136154, 130008, 140248, 138200, 136152, 138204, 138210, 136162, 130016]
2024-10-25 03:37:33,694 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - create reader for partition: PartitionDefinition{database='DB01', table='nyc_taxi_fare_data', beAddress='doris-be-01:9060', tabletIds=[136096, 136102, 140198, 136100, 129958, 140202, 136106, 136104, 140206, 136110, 140204, 138162, 136114, 140212, 136122, 136120, 129978, 140222, 129982, 140226, 138178, 136130, 138176, 136128, 138182, 129988, 129990, 129992, 140232, 138184, 136136, 136142, 129996, 140236, 136140, 130000, 140246, 138198, 138196, 130006, 140250, 136154, 130008, 140248, 138200, 136152, 138204, 138210, 136162, 130016]}
2024-10-25 03:37:33,697 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Success connect to Doris BE{host='doris-be-01', port=9060}.
2024-10-25 03:37:33,716 INFO  org.apache.doris.flink.backend.BackendClient                 [] - OpenScanner success for Doris BE 'Doris BE{host='doris-be-01', port=9060}' with contextId 'c5471d2a-822b-4d74-bb0c-8287f70f06cf' for tablets '[136096, 136102, 140198, 136100, 129958, 140202, 136106, 136104, 140206, 136110, 140204, 138162, 136114, 140212, 136122, 136120, 129978, 140222, 129982, 140226, 138178, 136130, 138176, 136128, 138182, 129988, 129990, 129992, 140232, 138184, 136136, 136142, 129996, 140236, 136140, 130000, 140246, 138198, 138196, 130006, 140250, 136154, 130008, 140248, 138200, 136152, 138204, 138210, 136162, 130016]'.
2024-10-25 03:37:33,722 INFO  org.apache.doris.flink.source.reader.DorisValueReader        [] - Scan finished, tablets: [136096, 136102, 140198, 136100, 129958, 140202, 136106, 136104, 140206, 136110, 140204, 138162, 136114, 140212, 136122, 136120, 129978, 140222, 129982, 140226, 138178, 136130, 138176, 136128, 138182, 129988, 129990, 129992, 140232, 138184, 136136, 136142, 129996, 140236, 136140, 130000, 140246, 138198, 138196, 130006, 140250, 136154, 130008, 140248, 138200, 136152, 138204, 138210, 136162, 130016], offset: 0
2024-10-25 03:37:33,724 INFO  org.apache.doris.flink.backend.BackendClient                 [] - CloseScanner to Doris BE 'Doris BE{host='doris-be-01', port=9060}' success for contextId c5471d2a-822b-4d74-bb0c-8287f70f06cf 
2024-10-25 03:37:33,724 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Closed a connection to Doris BE{host='doris-be-01', port=9060}.
2024-10-25 03:37:33,725 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished reading split(s) [doris-be-01:9060_51]
2024-10-25 03:37:33,731 INFO  org.apache.doris.flink.source.reader.DorisValueReader        [] - Scan finished, tablets: [138754, 136704, 138752, 138758, 136708, 138756, 136714, 130570, 138766, 130572, 138770, 130576, 138768, 130578, 136726, 130580, 136724, 136732, 138780, 130590, 136738, 138786, 130592, 138784, 130594, 136742, 130596, 138788, 130598, 136746, 138794, 138712, 136668, 136672, 138720, 136676, 138724, 130534, 130536, 130538, 138734, 130540, 138738, 138736, 136694, 136692, 136696, 130554, 130556, 136700], offset: 4197
2024-10-25 03:37:33,725 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished reading from splits [doris-be-01:9060_51]
2024-10-25 03:37:33,733 INFO  org.apache.doris.flink.backend.BackendClient                 [] - CloseScanner to Doris BE 'Doris BE{host='doris-be-01', port=9060}' success for contextId 4de7bc9e-63d9-4981-87bf-f4e7f5b084b9 
2024-10-25 03:37:33,734 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Closed a connection to Doris BE{host='doris-be-01', port=9060}.
2024-10-25 03:37:33,734 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished reading from splits [doris-be-01:9060_59]
2024-10-25 03:37:33,734 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished reading split(s) [doris-be-01:9060_59]
2024-10-25 03:37:33,735 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Closing splitFetcher 0 because it is idle.
2024-10-25 03:37:33,736 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Shutting down split fetcher 0
2024-10-25 03:37:33,735 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Closing splitFetcher 1 because it is idle.
2024-10-25 03:37:33,737 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Shutting down split fetcher 1
2024-10-25 03:37:33,737 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split fetcher 1 exited.
2024-10-25 03:37:33,736 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split fetcher 0 exited.
2024-10-25 03:37:33,746 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-01:9060_47,be=doris-be-01:9060,tablets=[139920, 137878, 137876, 139930, 135832, 139928, 135838, 139934, 139932, 137890, 135840, 137888, 139942, 135844, 137892, 135850, 139944, 137900, 139948, 137904, 137910, 139958, 135860, 137908, 139956, 139962, 137912, 139960, 139966, 135868, 135874, 139970, 139972, 135882, 137930, 139982, 139980, 139986, 135888, 139990, 129748, 137940, 139988, 129750, 135898, 129752, 139992, 137950, 129756, 135900]]
2024-10-25 03:37:33,747 INFO  org.apache.doris.flink.source.reader.DorisSourceReader       [] - Initialized reader state for split: DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-01:9060_47,be=doris-be-01:9060,tablets=[139920, 137878, 137876, 139930, 135832, 139928, 135838, 139934, 139932, 137890, 135840, 137888, 139942, 135844, 137892, 135850, 139944, 137900, 139948, 137904, 137910, 139958, 135860, 137908, 139956, 139962, 137912, 139960, 139966, 135868, 135874, 139970, 139972, 135882, 137930, 139982, 139980, 139986, 135888, 139990, 129748, 137940, 139988, 129750, 135898, 129752, 139992, 137950, 129756, 135900]
2024-10-25 03:37:33,747 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Starting split fetcher 2
2024-10-25 03:37:33,752 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Handling split change SplitAddition:[[DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-01:9060_47,be=doris-be-01:9060,tablets=[139920, 137878, 137876, 139930, 135832, 139928, 135838, 139934, 139932, 137890, 135840, 137888, 139942, 135844, 137892, 135850, 139944, 137900, 139948, 137904, 137910, 139958, 135860, 137908, 139956, 139962, 137912, 139960, 139966, 135868, 135874, 139970, 139972, 135882, 137930, 139982, 139980, 139986, 135888, 139990, 129748, 137940, 139988, 129750, 135898, 129752, 139992, 137950, 129756, 135900]]]
2024-10-25 03:37:33,754 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Fetch a new split DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-01:9060_47,be=doris-be-01:9060,tablets=[139920, 137878, 137876, 139930, 135832, 139928, 135838, 139934, 139932, 137890, 135840, 137888, 139942, 135844, 137892, 135850, 139944, 137900, 139948, 137904, 137910, 139958, 135860, 137908, 139956, 139962, 137912, 139960, 139966, 135868, 135874, 139970, 139972, 135882, 137930, 139982, 139980, 139986, 135888, 139990, 129748, 137940, 139988, 129750, 135898, 129752, 139992, 137950, 129756, 135900]
2024-10-25 03:37:33,754 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - create reader for partition: PartitionDefinition{database='DB01', table='nyc_taxi_fare_data', beAddress='doris-be-01:9060', tabletIds=[139920, 137878, 137876, 139930, 135832, 139928, 135838, 139934, 139932, 137890, 135840, 137888, 139942, 135844, 137892, 135850, 139944, 137900, 139948, 137904, 137910, 139958, 135860, 137908, 139956, 139962, 137912, 139960, 139966, 135868, 135874, 139970, 139972, 135882, 137930, 139982, 139980, 139986, 135888, 139990, 129748, 137940, 139988, 129750, 135898, 129752, 139992, 137950, 129756, 135900]}
2024-10-25 03:37:33,756 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Success connect to Doris BE{host='doris-be-01', port=9060}.
2024-10-25 03:37:33,761 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-01:9060_44,be=doris-be-01:9060,tablets=[127488, 127490, 135686, 127492, 135600, 137652, 135610, 127416, 127418, 137662, 127422, 137664, 135616, 127426, 127428, 137668, 137674, 137672, 135628, 137682, 127440, 135632, 127442, 137686, 137684, 137690, 127448, 137688, 135640, 137692, 137698, 127456, 137696, 135648, 135654, 137700, 137710, 127468, 137708, 127470, 137714, 127474, 135670, 135668, 137722, 137720, 135672, 137726, 127484, 137724]]
2024-10-25 03:37:33,761 INFO  org.apache.doris.flink.source.reader.DorisSourceReader       [] - Initialized reader state for split: DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-01:9060_44,be=doris-be-01:9060,tablets=[127488, 127490, 135686, 127492, 135600, 137652, 135610, 127416, 127418, 137662, 127422, 137664, 135616, 127426, 127428, 137668, 137674, 137672, 135628, 137682, 127440, 135632, 127442, 137686, 137684, 137690, 127448, 137688, 135640, 137692, 137698, 127456, 137696, 135648, 135654, 137700, 137710, 127468, 137708, 127470, 137714, 127474, 135670, 135668, 137722, 137720, 135672, 137726, 127484, 137724]
2024-10-25 03:37:33,762 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Starting split fetcher 1
2024-10-25 03:37:33,762 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Handling split change SplitAddition:[[DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-01:9060_44,be=doris-be-01:9060,tablets=[127488, 127490, 135686, 127492, 135600, 137652, 135610, 127416, 127418, 137662, 127422, 137664, 135616, 127426, 127428, 137668, 137674, 137672, 135628, 137682, 127440, 135632, 127442, 137686, 137684, 137690, 127448, 137688, 135640, 137692, 137698, 127456, 137696, 135648, 135654, 137700, 137710, 127468, 137708, 127470, 137714, 127474, 135670, 135668, 137722, 137720, 135672, 137726, 127484, 137724]]]
2024-10-25 03:37:33,763 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Fetch a new split DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-01:9060_44,be=doris-be-01:9060,tablets=[127488, 127490, 135686, 127492, 135600, 137652, 135610, 127416, 127418, 137662, 127422, 137664, 135616, 127426, 127428, 137668, 137674, 137672, 135628, 137682, 127440, 135632, 127442, 137686, 137684, 137690, 127448, 137688, 135640, 137692, 137698, 127456, 137696, 135648, 135654, 137700, 137710, 127468, 137708, 127470, 137714, 127474, 135670, 135668, 137722, 137720, 135672, 137726, 127484, 137724]
2024-10-25 03:37:33,763 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - create reader for partition: PartitionDefinition{database='DB01', table='nyc_taxi_fare_data', beAddress='doris-be-01:9060', tabletIds=[127488, 127490, 135686, 127492, 135600, 137652, 135610, 127416, 127418, 137662, 127422, 137664, 135616, 127426, 127428, 137668, 137674, 137672, 135628, 137682, 127440, 135632, 127442, 137686, 137684, 137690, 127448, 137688, 135640, 137692, 137698, 127456, 137696, 135648, 135654, 137700, 137710, 127468, 137708, 127470, 137714, 127474, 135670, 135668, 137722, 137720, 135672, 137726, 127484, 137724]}
2024-10-25 03:37:33,764 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Success connect to Doris BE{host='doris-be-01', port=9060}.
2024-10-25 03:37:33,776 INFO  org.apache.doris.flink.backend.BackendClient                 [] - OpenScanner success for Doris BE 'Doris BE{host='doris-be-01', port=9060}' with contextId '6c62fd9f-fd7a-4116-b744-71e154455d7f' for tablets '[139920, 137878, 137876, 139930, 135832, 139928, 135838, 139934, 139932, 137890, 135840, 137888, 139942, 135844, 137892, 135850, 139944, 137900, 139948, 137904, 137910, 139958, 135860, 137908, 139956, 139962, 137912, 139960, 139966, 135868, 135874, 139970, 139972, 135882, 137930, 139982, 139980, 139986, 135888, 139990, 129748, 137940, 139988, 129750, 135898, 129752, 139992, 137950, 129756, 135900]'.
2024-10-25 03:37:33,797 INFO  org.apache.doris.flink.backend.BackendClient                 [] - OpenScanner success for Doris BE 'Doris BE{host='doris-be-01', port=9060}' with contextId 'e1f12a2e-2552-4786-9d6d-d88813302ec7' for tablets '[127488, 127490, 135686, 127492, 135600, 137652, 135610, 127416, 127418, 137662, 127422, 137664, 135616, 127426, 127428, 137668, 137674, 137672, 135628, 137682, 127440, 135632, 127442, 137686, 137684, 137690, 127448, 137688, 135640, 137692, 137698, 127456, 137696, 135648, 135654, 137700, 137710, 127468, 137708, 127470, 137714, 127474, 135670, 135668, 137722, 137720, 135672, 137726, 127484, 137724]'.
2024-10-25 03:37:33,804 INFO  org.apache.doris.flink.source.reader.DorisValueReader        [] - Scan finished, tablets: [127488, 127490, 135686, 127492, 135600, 137652, 135610, 127416, 127418, 137662, 127422, 137664, 135616, 127426, 127428, 137668, 137674, 137672, 135628, 137682, 127440, 135632, 127442, 137686, 137684, 137690, 127448, 137688, 135640, 137692, 137698, 127456, 137696, 135648, 135654, 137700, 137710, 127468, 137708, 127470, 137714, 127474, 135670, 135668, 137722, 137720, 135672, 137726, 127484, 137724], offset: 0
2024-10-25 03:37:33,805 INFO  org.apache.doris.flink.backend.BackendClient                 [] - CloseScanner to Doris BE 'Doris BE{host='doris-be-01', port=9060}' success for contextId e1f12a2e-2552-4786-9d6d-d88813302ec7 
2024-10-25 03:37:33,805 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Closed a connection to Doris BE{host='doris-be-01', port=9060}.
2024-10-25 03:37:33,806 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished reading from splits [doris-be-01:9060_44]
2024-10-25 03:37:33,806 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished reading split(s) [doris-be-01:9060_44]
2024-10-25 03:37:33,808 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Closing splitFetcher 1 because it is idle.
2024-10-25 03:37:33,808 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Shutting down split fetcher 1
2024-10-25 03:37:33,808 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split fetcher 1 exited.
2024-10-25 03:37:33,809 INFO  org.apache.doris.flink.source.reader.DorisValueReader        [] - Scan finished, tablets: [139920, 137878, 137876, 139930, 135832, 139928, 135838, 139934, 139932, 137890, 135840, 137888, 139942, 135844, 137892, 135850, 139944, 137900, 139948, 137904, 137910, 139958, 135860, 137908, 139956, 139962, 137912, 139960, 139966, 135868, 135874, 139970, 139972, 135882, 137930, 139982, 139980, 139986, 135888, 139990, 129748, 137940, 139988, 129750, 135898, 129752, 139992, 137950, 129756, 135900], offset: 1
2024-10-25 03:37:33,813 INFO  org.apache.doris.flink.backend.BackendClient                 [] - CloseScanner to Doris BE 'Doris BE{host='doris-be-01', port=9060}' success for contextId 6c62fd9f-fd7a-4116-b744-71e154455d7f 
2024-10-25 03:37:33,814 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Closed a connection to Doris BE{host='doris-be-01', port=9060}.
2024-10-25 03:37:33,814 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished reading from splits [doris-be-01:9060_47]
2024-10-25 03:37:33,814 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished reading split(s) [doris-be-01:9060_47]
2024-10-25 03:37:33,816 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Closing splitFetcher 2 because it is idle.
2024-10-25 03:37:33,816 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Shutting down split fetcher 2
2024-10-25 03:37:33,817 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split fetcher 2 exited.
2024-10-25 03:37:33,831 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-01:9060_40,be=doris-be-01:9060,tablets=[137346, 127104, 137344, 127106, 135302, 137350, 135300, 137354, 127112, 137358, 127116, 135308, 127118, 127122, 127126, 135322, 137370, 127128, 137368, 127132, 135330, 135328, 137376, 135334, 137380, 127142, 135338, 137384, 135342, 137390, 127152, 135344, 137392, 137398, 127156, 137400, 127162, 135356, 135268, 135272, 137324, 135282, 137330, 137328, 135284, 137332, 127098, 135294, 127100, 137340]]
2024-10-25 03:37:33,831 INFO  org.apache.doris.flink.source.reader.DorisSourceReader       [] - Initialized reader state for split: DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-01:9060_40,be=doris-be-01:9060,tablets=[137346, 127104, 137344, 127106, 135302, 137350, 135300, 137354, 127112, 137358, 127116, 135308, 127118, 127122, 127126, 135322, 137370, 127128, 137368, 127132, 135330, 135328, 137376, 135334, 137380, 127142, 135338, 137384, 135342, 137390, 127152, 135344, 137392, 137398, 127156, 137400, 127162, 135356, 135268, 135272, 137324, 135282, 137330, 137328, 135284, 137332, 127098, 135294, 127100, 137340]
2024-10-25 03:37:33,832 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Starting split fetcher 2
2024-10-25 03:37:33,832 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Handling split change SplitAddition:[[DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-01:9060_40,be=doris-be-01:9060,tablets=[137346, 127104, 137344, 127106, 135302, 137350, 135300, 137354, 127112, 137358, 127116, 135308, 127118, 127122, 127126, 135322, 137370, 127128, 137368, 127132, 135330, 135328, 137376, 135334, 137380, 127142, 135338, 137384, 135342, 137390, 127152, 135344, 137392, 137398, 127156, 137400, 127162, 135356, 135268, 135272, 137324, 135282, 137330, 137328, 135284, 137332, 127098, 135294, 127100, 137340]]]
2024-10-25 03:37:33,832 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Fetch a new split DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-01:9060_40,be=doris-be-01:9060,tablets=[137346, 127104, 137344, 127106, 135302, 137350, 135300, 137354, 127112, 137358, 127116, 135308, 127118, 127122, 127126, 135322, 137370, 127128, 137368, 127132, 135330, 135328, 137376, 135334, 137380, 127142, 135338, 137384, 135342, 137390, 127152, 135344, 137392, 137398, 127156, 137400, 127162, 135356, 135268, 135272, 137324, 135282, 137330, 137328, 135284, 137332, 127098, 135294, 127100, 137340]
2024-10-25 03:37:33,832 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - create reader for partition: PartitionDefinition{database='DB01', table='nyc_taxi_fare_data', beAddress='doris-be-01:9060', tabletIds=[137346, 127104, 137344, 127106, 135302, 137350, 135300, 137354, 127112, 137358, 127116, 135308, 127118, 127122, 127126, 135322, 137370, 127128, 137368, 127132, 135330, 135328, 137376, 135334, 137380, 127142, 135338, 137384, 135342, 137390, 127152, 135344, 137392, 137398, 127156, 137400, 127162, 135356, 135268, 135272, 137324, 135282, 137330, 137328, 135284, 137332, 127098, 135294, 127100, 137340]}
2024-10-25 03:37:33,833 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Success connect to Doris BE{host='doris-be-01', port=9060}.
2024-10-25 03:37:33,840 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-03:9060_38,be=doris-be-03:9060,tablets=[139250, 135154, 139248, 137200, 137204, 137198, 139244, 135148, 139260, 135164]]
2024-10-25 03:37:33,841 INFO  org.apache.doris.flink.source.reader.DorisSourceReader       [] - Initialized reader state for split: DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-03:9060_38,be=doris-be-03:9060,tablets=[139250, 135154, 139248, 137200, 137204, 137198, 139244, 135148, 139260, 135164]
2024-10-25 03:37:33,842 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Starting split fetcher 3
2024-10-25 03:37:33,842 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Handling split change SplitAddition:[[DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-03:9060_38,be=doris-be-03:9060,tablets=[139250, 135154, 139248, 137200, 137204, 137198, 139244, 135148, 139260, 135164]]]
2024-10-25 03:37:33,843 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Fetch a new split DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-03:9060_38,be=doris-be-03:9060,tablets=[139250, 135154, 139248, 137200, 137204, 137198, 139244, 135148, 139260, 135164]
2024-10-25 03:37:33,843 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - create reader for partition: PartitionDefinition{database='DB01', table='nyc_taxi_fare_data', beAddress='doris-be-03:9060', tabletIds=[139250, 135154, 139248, 137200, 137204, 137198, 139244, 135148, 139260, 135164]}
2024-10-25 03:37:33,845 INFO  org.apache.doris.flink.backend.BackendClient                 [] - OpenScanner success for Doris BE 'Doris BE{host='doris-be-01', port=9060}' with contextId '6aa4f176-43f8-45a7-a0f7-e2aac30fd4b4' for tablets '[137346, 127104, 137344, 127106, 135302, 137350, 135300, 137354, 127112, 137358, 127116, 135308, 127118, 127122, 127126, 135322, 137370, 127128, 137368, 127132, 135330, 135328, 137376, 135334, 137380, 127142, 135338, 137384, 135342, 137390, 127152, 135344, 137392, 137398, 127156, 137400, 127162, 135356, 135268, 135272, 137324, 135282, 137330, 137328, 135284, 137332, 127098, 135294, 127100, 137340]'.
2024-10-25 03:37:33,850 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Success connect to Doris BE{host='doris-be-03', port=9060}.
2024-10-25 03:37:33,860 INFO  org.apache.doris.flink.backend.BackendClient                 [] - OpenScanner success for Doris BE 'Doris BE{host='doris-be-03', port=9060}' with contextId '594c785c-7cd5-42ff-8c33-16c0801d321e' for tablets '[139250, 135154, 139248, 137200, 137204, 137198, 139244, 135148, 139260, 135164]'.
2024-10-25 03:37:33,869 INFO  org.apache.doris.flink.source.reader.DorisValueReader        [] - Scan finished, tablets: [139250, 135154, 139248, 137200, 137204, 137198, 139244, 135148, 139260, 135164], offset: 4
2024-10-25 03:37:33,870 INFO  org.apache.doris.flink.backend.BackendClient                 [] - CloseScanner to Doris BE 'Doris BE{host='doris-be-03', port=9060}' success for contextId 594c785c-7cd5-42ff-8c33-16c0801d321e 
2024-10-25 03:37:33,870 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Closed a connection to Doris BE{host='doris-be-03', port=9060}.
2024-10-25 03:37:33,870 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished reading from splits [doris-be-03:9060_38]
2024-10-25 03:37:33,871 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Closing splitFetcher 3 because it is idle.
2024-10-25 03:37:33,872 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Shutting down split fetcher 3
2024-10-25 03:37:33,872 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished reading split(s) [doris-be-03:9060_38]
2024-10-25 03:37:33,873 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split fetcher 3 exited.
2024-10-25 03:37:33,876 INFO  org.apache.doris.flink.source.reader.DorisValueReader        [] - Scan finished, tablets: [137346, 127104, 137344, 127106, 135302, 137350, 135300, 137354, 127112, 137358, 127116, 135308, 127118, 127122, 127126, 135322, 137370, 127128, 137368, 127132, 135330, 135328, 137376, 135334, 137380, 127142, 135338, 137384, 135342, 137390, 127152, 135344, 137392, 137398, 127156, 137400, 127162, 135356, 135268, 135272, 137324, 135282, 137330, 137328, 135284, 137332, 127098, 135294, 127100, 137340], offset: 8
2024-10-25 03:37:33,877 INFO  org.apache.doris.flink.backend.BackendClient                 [] - CloseScanner to Doris BE 'Doris BE{host='doris-be-01', port=9060}' success for contextId 6aa4f176-43f8-45a7-a0f7-e2aac30fd4b4 
2024-10-25 03:37:33,878 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Closed a connection to Doris BE{host='doris-be-01', port=9060}.
2024-10-25 03:37:33,878 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished reading from splits [doris-be-01:9060_40]
2024-10-25 03:37:33,878 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished reading split(s) [doris-be-01:9060_40]
2024-10-25 03:37:33,879 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Closing splitFetcher 2 because it is idle.
2024-10-25 03:37:33,879 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Shutting down split fetcher 2
2024-10-25 03:37:33,879 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split fetcher 2 exited.
2024-10-25 03:37:33,894 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-03:9060_35,be=doris-be-03:9060,tablets=[136962, 138910, 136860, 138914, 130722, 138918, 130724, 138916, 136872, 130730, 136878, 138924, 130734, 138934, 130742, 136890, 138942, 130748, 138946, 136896, 136900, 130758, 130760, 138952, 136904, 130762, 138958, 130764, 130768, 138960, 136912, 138966, 130774, 138970, 136922, 138968, 136920, 130780, 136924, 136930, 130784, 138976, 138982, 130790, 138986, 136938, 130804, 138996, 136948, 136952]]
2024-10-25 03:37:33,894 INFO  org.apache.doris.flink.source.reader.DorisSourceReader       [] - Initialized reader state for split: DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-03:9060_35,be=doris-be-03:9060,tablets=[136962, 138910, 136860, 138914, 130722, 138918, 130724, 138916, 136872, 130730, 136878, 138924, 130734, 138934, 130742, 136890, 138942, 130748, 138946, 136896, 136900, 130758, 130760, 138952, 136904, 130762, 138958, 130764, 130768, 138960, 136912, 138966, 130774, 138970, 136922, 138968, 136920, 130780, 136924, 136930, 130784, 138976, 138982, 130790, 138986, 136938, 130804, 138996, 136948, 136952]
2024-10-25 03:37:33,895 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Starting split fetcher 4
2024-10-25 03:37:33,895 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Handling split change SplitAddition:[[DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-03:9060_35,be=doris-be-03:9060,tablets=[136962, 138910, 136860, 138914, 130722, 138918, 130724, 138916, 136872, 130730, 136878, 138924, 130734, 138934, 130742, 136890, 138942, 130748, 138946, 136896, 136900, 130758, 130760, 138952, 136904, 130762, 138958, 130764, 130768, 138960, 136912, 138966, 130774, 138970, 136922, 138968, 136920, 130780, 136924, 136930, 130784, 138976, 138982, 130790, 138986, 136938, 130804, 138996, 136948, 136952]]]
2024-10-25 03:37:33,895 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Fetch a new split DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-03:9060_35,be=doris-be-03:9060,tablets=[136962, 138910, 136860, 138914, 130722, 138918, 130724, 138916, 136872, 130730, 136878, 138924, 130734, 138934, 130742, 136890, 138942, 130748, 138946, 136896, 136900, 130758, 130760, 138952, 136904, 130762, 138958, 130764, 130768, 138960, 136912, 138966, 130774, 138970, 136922, 138968, 136920, 130780, 136924, 136930, 130784, 138976, 138982, 130790, 138986, 136938, 130804, 138996, 136948, 136952]
2024-10-25 03:37:33,895 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - create reader for partition: PartitionDefinition{database='DB01', table='nyc_taxi_fare_data', beAddress='doris-be-03:9060', tabletIds=[136962, 138910, 136860, 138914, 130722, 138918, 130724, 138916, 136872, 130730, 136878, 138924, 130734, 138934, 130742, 136890, 138942, 130748, 138946, 136896, 136900, 130758, 130760, 138952, 136904, 130762, 138958, 130764, 130768, 138960, 136912, 138966, 130774, 138970, 136922, 138968, 136920, 130780, 136924, 136930, 130784, 138976, 138982, 130790, 138986, 136938, 130804, 138996, 136948, 136952]}
2024-10-25 03:37:33,896 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Success connect to Doris BE{host='doris-be-03', port=9060}.
2024-10-25 03:37:33,911 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-03:9060_34,be=doris-be-03:9060,tablets=[130688, 138880, 130690, 138884, 136836, 130694, 136840, 130698, 136850, 136852, 136858, 136758, 138804, 130614, 130616, 138808, 130620, 138812, 130622, 138816, 136768, 130626, 136778, 138824, 136776, 138830, 138828, 138834, 130640, 136790, 130644, 136788, 136794, 138840, 130654, 130658, 136806, 138852, 138858, 130668, 136812, 130674, 138870, 138868, 130678, 138874, 136826, 130682, 136830, 130686]]
2024-10-25 03:37:33,911 INFO  org.apache.doris.flink.source.reader.DorisSourceReader       [] - Initialized reader state for split: DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-03:9060_34,be=doris-be-03:9060,tablets=[130688, 138880, 130690, 138884, 136836, 130694, 136840, 130698, 136850, 136852, 136858, 136758, 138804, 130614, 130616, 138808, 130620, 138812, 130622, 138816, 136768, 130626, 136778, 138824, 136776, 138830, 138828, 138834, 130640, 136790, 130644, 136788, 136794, 138840, 130654, 130658, 136806, 138852, 138858, 130668, 136812, 130674, 138870, 138868, 130678, 138874, 136826, 130682, 136830, 130686]
2024-10-25 03:37:33,911 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Starting split fetcher 3
2024-10-25 03:37:33,912 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Handling split change SplitAddition:[[DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-03:9060_34,be=doris-be-03:9060,tablets=[130688, 138880, 130690, 138884, 136836, 130694, 136840, 130698, 136850, 136852, 136858, 136758, 138804, 130614, 130616, 138808, 130620, 138812, 130622, 138816, 136768, 130626, 136778, 138824, 136776, 138830, 138828, 138834, 130640, 136790, 130644, 136788, 136794, 138840, 130654, 130658, 136806, 138852, 138858, 130668, 136812, 130674, 138870, 138868, 130678, 138874, 136826, 130682, 136830, 130686]]]
2024-10-25 03:37:33,912 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Fetch a new split DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-03:9060_34,be=doris-be-03:9060,tablets=[130688, 138880, 130690, 138884, 136836, 130694, 136840, 130698, 136850, 136852, 136858, 136758, 138804, 130614, 130616, 138808, 130620, 138812, 130622, 138816, 136768, 130626, 136778, 138824, 136776, 138830, 138828, 138834, 130640, 136790, 130644, 136788, 136794, 138840, 130654, 130658, 136806, 138852, 138858, 130668, 136812, 130674, 138870, 138868, 130678, 138874, 136826, 130682, 136830, 130686]
2024-10-25 03:37:33,912 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - create reader for partition: PartitionDefinition{database='DB01', table='nyc_taxi_fare_data', beAddress='doris-be-03:9060', tabletIds=[130688, 138880, 130690, 138884, 136836, 130694, 136840, 130698, 136850, 136852, 136858, 136758, 138804, 130614, 130616, 138808, 130620, 138812, 130622, 138816, 136768, 130626, 136778, 138824, 136776, 138830, 138828, 138834, 130640, 136790, 130644, 136788, 136794, 138840, 130654, 130658, 136806, 138852, 138858, 130668, 136812, 130674, 138870, 138868, 130678, 138874, 136826, 130682, 136830, 130686]}
2024-10-25 03:37:33,913 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Success connect to Doris BE{host='doris-be-03', port=9060}.
2024-10-25 03:37:33,922 INFO  org.apache.doris.flink.backend.BackendClient                 [] - OpenScanner success for Doris BE 'Doris BE{host='doris-be-03', port=9060}' with contextId '36279ae1-196a-43ec-b082-103b568fc845' for tablets '[136962, 138910, 136860, 138914, 130722, 138918, 130724, 138916, 136872, 130730, 136878, 138924, 130734, 138934, 130742, 136890, 138942, 130748, 138946, 136896, 136900, 130758, 130760, 138952, 136904, 130762, 138958, 130764, 130768, 138960, 136912, 138966, 130774, 138970, 136922, 138968, 136920, 130780, 136924, 136930, 130784, 138976, 138982, 130790, 138986, 136938, 130804, 138996, 136948, 136952]'.
2024-10-25 03:37:33,968 INFO  org.apache.doris.flink.backend.BackendClient                 [] - OpenScanner success for Doris BE 'Doris BE{host='doris-be-03', port=9060}' with contextId '46893f19-1395-4c7d-8863-f6aa6af18952' for tablets '[130688, 138880, 130690, 138884, 136836, 130694, 136840, 130698, 136850, 136852, 136858, 136758, 138804, 130614, 130616, 138808, 130620, 138812, 130622, 138816, 136768, 130626, 136778, 138824, 136776, 138830, 138828, 138834, 130640, 136790, 130644, 136788, 136794, 138840, 130654, 130658, 136806, 138852, 138858, 130668, 136812, 130674, 138870, 138868, 130678, 138874, 136826, 130682, 136830, 130686]'.
2024-10-25 03:37:34,044 INFO  org.apache.doris.flink.source.reader.DorisValueReader        [] - Scan finished, tablets: [136962, 138910, 136860, 138914, 130722, 138918, 130724, 138916, 136872, 130730, 136878, 138924, 130734, 138934, 130742, 136890, 138942, 130748, 138946, 136896, 136900, 130758, 130760, 138952, 136904, 130762, 138958, 130764, 130768, 138960, 136912, 138966, 130774, 138970, 136922, 138968, 136920, 130780, 136924, 136930, 130784, 138976, 138982, 130790, 138986, 136938, 130804, 138996, 136948, 136952], offset: 2317
2024-10-25 03:37:34,046 INFO  org.apache.doris.flink.backend.BackendClient                 [] - CloseScanner to Doris BE 'Doris BE{host='doris-be-03', port=9060}' success for contextId 36279ae1-196a-43ec-b082-103b568fc845 
2024-10-25 03:37:34,046 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Closed a connection to Doris BE{host='doris-be-03', port=9060}.
2024-10-25 03:37:34,047 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished reading from splits [doris-be-03:9060_35]
2024-10-25 03:37:34,050 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Closing splitFetcher 4 because it is idle.
2024-10-25 03:37:34,050 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Shutting down split fetcher 4
2024-10-25 03:37:34,051 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split fetcher 4 exited.
2024-10-25 03:37:34,051 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished reading split(s) [doris-be-03:9060_35]
2024-10-25 03:37:34,068 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-03:9060_28,be=doris-be-03:9060,tablets=[136192, 138240, 138172, 136124, 129984, 140224, 129986, 140230, 140228, 138186, 136138, 140238, 138190, 140242, 136146, 140240, 138192, 136144, 130002, 136150, 130004, 140244, 136148, 138202, 130010, 138206, 130012, 140252, 136156, 130014, 140258, 140256, 130018, 140262, 138214, 136166, 130020, 140260, 138212, 130022, 140266, 136170, 140264, 130028, 136172, 130030, 136180, 130038, 140282, 130046]]
2024-10-25 03:37:34,068 INFO  org.apache.doris.flink.source.reader.DorisSourceReader       [] - Initialized reader state for split: DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-03:9060_28,be=doris-be-03:9060,tablets=[136192, 138240, 138172, 136124, 129984, 140224, 129986, 140230, 140228, 138186, 136138, 140238, 138190, 140242, 136146, 140240, 138192, 136144, 130002, 136150, 130004, 140244, 136148, 138202, 130010, 138206, 130012, 140252, 136156, 130014, 140258, 140256, 130018, 140262, 138214, 136166, 130020, 140260, 138212, 130022, 140266, 136170, 140264, 130028, 136172, 130030, 136180, 130038, 140282, 130046]
2024-10-25 03:37:34,068 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Starting split fetcher 5
2024-10-25 03:37:34,069 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Handling split change SplitAddition:[[DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-03:9060_28,be=doris-be-03:9060,tablets=[136192, 138240, 138172, 136124, 129984, 140224, 129986, 140230, 140228, 138186, 136138, 140238, 138190, 140242, 136146, 140240, 138192, 136144, 130002, 136150, 130004, 140244, 136148, 138202, 130010, 138206, 130012, 140252, 136156, 130014, 140258, 140256, 130018, 140262, 138214, 136166, 130020, 140260, 138212, 130022, 140266, 136170, 140264, 130028, 136172, 130030, 136180, 130038, 140282, 130046]]]
2024-10-25 03:37:34,069 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Fetch a new split DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-03:9060_28,be=doris-be-03:9060,tablets=[136192, 138240, 138172, 136124, 129984, 140224, 129986, 140230, 140228, 138186, 136138, 140238, 138190, 140242, 136146, 140240, 138192, 136144, 130002, 136150, 130004, 140244, 136148, 138202, 130010, 138206, 130012, 140252, 136156, 130014, 140258, 140256, 130018, 140262, 138214, 136166, 130020, 140260, 138212, 130022, 140266, 136170, 140264, 130028, 136172, 130030, 136180, 130038, 140282, 130046]
2024-10-25 03:37:34,069 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - create reader for partition: PartitionDefinition{database='DB01', table='nyc_taxi_fare_data', beAddress='doris-be-03:9060', tabletIds=[136192, 138240, 138172, 136124, 129984, 140224, 129986, 140230, 140228, 138186, 136138, 140238, 138190, 140242, 136146, 140240, 138192, 136144, 130002, 136150, 130004, 140244, 136148, 138202, 130010, 138206, 130012, 140252, 136156, 130014, 140258, 140256, 130018, 140262, 138214, 136166, 130020, 140260, 138212, 130022, 140266, 136170, 140264, 130028, 136172, 130030, 136180, 130038, 140282, 130046]}
2024-10-25 03:37:34,070 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Success connect to Doris BE{host='doris-be-03', port=9060}.
2024-10-25 03:37:34,090 INFO  org.apache.doris.flink.source.reader.DorisValueReader        [] - Scan finished, tablets: [130688, 138880, 130690, 138884, 136836, 130694, 136840, 130698, 136850, 136852, 136858, 136758, 138804, 130614, 130616, 138808, 130620, 138812, 130622, 138816, 136768, 130626, 136778, 138824, 136776, 138830, 138828, 138834, 130640, 136790, 130644, 136788, 136794, 138840, 130654, 130658, 136806, 138852, 138858, 130668, 136812, 130674, 138870, 138868, 130678, 138874, 136826, 130682, 136830, 130686], offset: 7313
2024-10-25 03:37:34,092 INFO  org.apache.doris.flink.backend.BackendClient                 [] - OpenScanner success for Doris BE 'Doris BE{host='doris-be-03', port=9060}' with contextId '65ef3d00-3caa-4a56-822c-cbb03323c2ef' for tablets '[136192, 138240, 138172, 136124, 129984, 140224, 129986, 140230, 140228, 138186, 136138, 140238, 138190, 140242, 136146, 140240, 138192, 136144, 130002, 136150, 130004, 140244, 136148, 138202, 130010, 138206, 130012, 140252, 136156, 130014, 140258, 140256, 130018, 140262, 138214, 136166, 130020, 140260, 138212, 130022, 140266, 136170, 140264, 130028, 136172, 130030, 136180, 130038, 140282, 130046]'.
2024-10-25 03:37:34,093 INFO  org.apache.doris.flink.source.reader.DorisValueReader        [] - Scan finished, tablets: [136192, 138240, 138172, 136124, 129984, 140224, 129986, 140230, 140228, 138186, 136138, 140238, 138190, 140242, 136146, 140240, 138192, 136144, 130002, 136150, 130004, 140244, 136148, 138202, 130010, 138206, 130012, 140252, 136156, 130014, 140258, 140256, 130018, 140262, 138214, 136166, 130020, 140260, 138212, 130022, 140266, 136170, 140264, 130028, 136172, 130030, 136180, 130038, 140282, 130046], offset: 0
2024-10-25 03:37:34,097 INFO  org.apache.doris.flink.backend.BackendClient                 [] - CloseScanner to Doris BE 'Doris BE{host='doris-be-03', port=9060}' success for contextId 65ef3d00-3caa-4a56-822c-cbb03323c2ef 
2024-10-25 03:37:34,098 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Closed a connection to Doris BE{host='doris-be-03', port=9060}.
2024-10-25 03:37:34,098 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished reading from splits [doris-be-03:9060_28]
2024-10-25 03:37:34,098 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished reading split(s) [doris-be-03:9060_28]
2024-10-25 03:37:34,099 INFO  org.apache.doris.flink.backend.BackendClient                 [] - CloseScanner to Doris BE 'Doris BE{host='doris-be-03', port=9060}' success for contextId 46893f19-1395-4c7d-8863-f6aa6af18952 
2024-10-25 03:37:34,099 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Closed a connection to Doris BE{host='doris-be-03', port=9060}.
2024-10-25 03:37:34,099 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished reading from splits [doris-be-03:9060_34]
2024-10-25 03:37:34,099 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Closing splitFetcher 5 because it is idle.
2024-10-25 03:37:34,100 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Shutting down split fetcher 5
2024-10-25 03:37:34,100 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split fetcher 5 exited.
2024-10-25 03:37:34,099 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished reading split(s) [doris-be-03:9060_34]
2024-10-25 03:37:34,101 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Closing splitFetcher 3 because it is idle.
2024-10-25 03:37:34,101 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Shutting down split fetcher 3
2024-10-25 03:37:34,101 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split fetcher 3 exited.
2024-10-25 03:37:34,113 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-03:9060_24,be=doris-be-03:9060,tablets=[139906, 135814, 139910, 137860, 139908, 137866, 127626, 139918, 135820, 139916, 137874, 135830, 139926, 137882, 137880, 137886, 135836, 139938, 135846, 137894, 137896, 135854, 135852, 139954, 135856, 135862, 137918, 135872, 137920, 137926, 137924, 137928, 139976, 135890, 137938, 137936, 139984, 135894, 137942, 137946, 135896, 137944, 135902, 139998, 137948, 129758, 135904, 137952, 137852, 139900]]
2024-10-25 03:37:34,113 INFO  org.apache.doris.flink.source.reader.DorisSourceReader       [] - Initialized reader state for split: DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-03:9060_24,be=doris-be-03:9060,tablets=[139906, 135814, 139910, 137860, 139908, 137866, 127626, 139918, 135820, 139916, 137874, 135830, 139926, 137882, 137880, 137886, 135836, 139938, 135846, 137894, 137896, 135854, 135852, 139954, 135856, 135862, 137918, 135872, 137920, 137926, 137924, 137928, 139976, 135890, 137938, 137936, 139984, 135894, 137942, 137946, 135896, 137944, 135902, 139998, 137948, 129758, 135904, 137952, 137852, 139900]
2024-10-25 03:37:34,114 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Starting split fetcher 6
2024-10-25 03:37:34,122 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Handling split change SplitAddition:[[DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-03:9060_24,be=doris-be-03:9060,tablets=[139906, 135814, 139910, 137860, 139908, 137866, 127626, 139918, 135820, 139916, 137874, 135830, 139926, 137882, 137880, 137886, 135836, 139938, 135846, 137894, 137896, 135854, 135852, 139954, 135856, 135862, 137918, 135872, 137920, 137926, 137924, 137928, 139976, 135890, 137938, 137936, 139984, 135894, 137942, 137946, 135896, 137944, 135902, 139998, 137948, 129758, 135904, 137952, 137852, 139900]]]
2024-10-25 03:37:34,122 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Fetch a new split DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-03:9060_24,be=doris-be-03:9060,tablets=[139906, 135814, 139910, 137860, 139908, 137866, 127626, 139918, 135820, 139916, 137874, 135830, 139926, 137882, 137880, 137886, 135836, 139938, 135846, 137894, 137896, 135854, 135852, 139954, 135856, 135862, 137918, 135872, 137920, 137926, 137924, 137928, 139976, 135890, 137938, 137936, 139984, 135894, 137942, 137946, 135896, 137944, 135902, 139998, 137948, 129758, 135904, 137952, 137852, 139900]
2024-10-25 03:37:34,123 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - create reader for partition: PartitionDefinition{database='DB01', table='nyc_taxi_fare_data', beAddress='doris-be-03:9060', tabletIds=[139906, 135814, 139910, 137860, 139908, 137866, 127626, 139918, 135820, 139916, 137874, 135830, 139926, 137882, 137880, 137886, 135836, 139938, 135846, 137894, 137896, 135854, 135852, 139954, 135856, 135862, 137918, 135872, 137920, 137926, 137924, 137928, 139976, 135890, 137938, 137936, 139984, 135894, 137942, 137946, 135896, 137944, 135902, 139998, 137948, 129758, 135904, 137952, 137852, 139900]}
2024-10-25 03:37:34,123 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Success connect to Doris BE{host='doris-be-03', port=9060}.
2024-10-25 03:37:34,128 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-03:9060_23,be=doris-be-03:9060,tablets=[127542, 127544, 139838, 135740, 139836, 127550, 137794, 139842, 139840, 127554, 139846, 127556, 135754, 137802, 139850, 127562, 135758, 139852, 137810, 139858, 127568, 135760, 137808, 135766, 127576, 139864, 137822, 139870, 127580, 135772, 137820, 127582, 135776, 139872, 137830, 139878, 127588, 127590, 127596, 127598, 139890, 139888, 127602, 127604, 135796, 137844, 137850, 137848, 127612, 135804]]
2024-10-25 03:37:34,129 INFO  org.apache.doris.flink.source.reader.DorisSourceReader       [] - Initialized reader state for split: DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-03:9060_23,be=doris-be-03:9060,tablets=[127542, 127544, 139838, 135740, 139836, 127550, 137794, 139842, 139840, 127554, 139846, 127556, 135754, 137802, 139850, 127562, 135758, 139852, 137810, 139858, 127568, 135760, 137808, 135766, 127576, 139864, 137822, 139870, 127580, 135772, 137820, 127582, 135776, 139872, 137830, 139878, 127588, 127590, 127596, 127598, 139890, 139888, 127602, 127604, 135796, 137844, 137850, 137848, 127612, 135804]
2024-10-25 03:37:34,129 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Starting split fetcher 4
2024-10-25 03:37:34,129 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Handling split change SplitAddition:[[DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-03:9060_23,be=doris-be-03:9060,tablets=[127542, 127544, 139838, 135740, 139836, 127550, 137794, 139842, 139840, 127554, 139846, 127556, 135754, 137802, 139850, 127562, 135758, 139852, 137810, 139858, 127568, 135760, 137808, 135766, 127576, 139864, 137822, 139870, 127580, 135772, 137820, 127582, 135776, 139872, 137830, 139878, 127588, 127590, 127596, 127598, 139890, 139888, 127602, 127604, 135796, 137844, 137850, 137848, 127612, 135804]]]
2024-10-25 03:37:34,129 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Fetch a new split DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-03:9060_23,be=doris-be-03:9060,tablets=[127542, 127544, 139838, 135740, 139836, 127550, 137794, 139842, 139840, 127554, 139846, 127556, 135754, 137802, 139850, 127562, 135758, 139852, 137810, 139858, 127568, 135760, 137808, 135766, 127576, 139864, 137822, 139870, 127580, 135772, 137820, 127582, 135776, 139872, 137830, 139878, 127588, 127590, 127596, 127598, 139890, 139888, 127602, 127604, 135796, 137844, 137850, 137848, 127612, 135804]
2024-10-25 03:37:34,130 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - create reader for partition: PartitionDefinition{database='DB01', table='nyc_taxi_fare_data', beAddress='doris-be-03:9060', tabletIds=[127542, 127544, 139838, 135740, 139836, 127550, 137794, 139842, 139840, 127554, 139846, 127556, 135754, 137802, 139850, 127562, 135758, 139852, 137810, 139858, 127568, 135760, 137808, 135766, 127576, 139864, 137822, 139870, 127580, 135772, 137820, 127582, 135776, 139872, 137830, 139878, 127588, 127590, 127596, 127598, 139890, 139888, 127602, 127604, 135796, 137844, 137850, 137848, 127612, 135804]}
2024-10-25 03:37:34,130 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Success connect to Doris BE{host='doris-be-03', port=9060}.
2024-10-25 03:37:34,135 INFO  org.apache.doris.flink.backend.BackendClient                 [] - OpenScanner success for Doris BE 'Doris BE{host='doris-be-03', port=9060}' with contextId '87bce1a0-0c8b-41f2-89ec-fe7f49a8a7b6' for tablets '[139906, 135814, 139910, 137860, 139908, 137866, 127626, 139918, 135820, 139916, 137874, 135830, 139926, 137882, 137880, 137886, 135836, 139938, 135846, 137894, 137896, 135854, 135852, 139954, 135856, 135862, 137918, 135872, 137920, 137926, 137924, 137928, 139976, 135890, 137938, 137936, 139984, 135894, 137942, 137946, 135896, 137944, 135902, 139998, 137948, 129758, 135904, 137952, 137852, 139900]'.
2024-10-25 03:37:34,136 INFO  org.apache.doris.flink.source.reader.DorisValueReader        [] - Scan finished, tablets: [139906, 135814, 139910, 137860, 139908, 137866, 127626, 139918, 135820, 139916, 137874, 135830, 139926, 137882, 137880, 137886, 135836, 139938, 135846, 137894, 137896, 135854, 135852, 139954, 135856, 135862, 137918, 135872, 137920, 137926, 137924, 137928, 139976, 135890, 137938, 137936, 139984, 135894, 137942, 137946, 135896, 137944, 135902, 139998, 137948, 129758, 135904, 137952, 137852, 139900], offset: 0
2024-10-25 03:37:34,137 INFO  org.apache.doris.flink.backend.BackendClient                 [] - CloseScanner to Doris BE 'Doris BE{host='doris-be-03', port=9060}' success for contextId 87bce1a0-0c8b-41f2-89ec-fe7f49a8a7b6 
2024-10-25 03:37:34,137 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Closed a connection to Doris BE{host='doris-be-03', port=9060}.
2024-10-25 03:37:34,137 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished reading split(s) [doris-be-03:9060_24]
2024-10-25 03:37:34,138 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished reading from splits [doris-be-03:9060_24]
2024-10-25 03:37:34,142 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Closing splitFetcher 6 because it is idle.
2024-10-25 03:37:34,142 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Shutting down split fetcher 6
2024-10-25 03:37:34,142 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split fetcher 6 exited.
2024-10-25 03:37:34,153 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-03:9060_21,be=doris-be-03:9060,tablets=[137602, 137600, 135552, 137606, 137608, 135560, 137614, 137612, 127376, 135568, 127378, 135572, 127382, 137626, 135578, 127384, 127388, 135584, 127394, 135590, 137636, 135592, 137646, 137644, 127406, 137648, 127410, 127412, 135614, 127420, 135612, 137666, 135618, 135620, 135624, 137678, 135630, 127436, 127438, 135634, 135638, 127444, 135636, 127446, 135642, 137694, 135644, 127454, 135650, 127458]]
2024-10-25 03:37:34,154 INFO  org.apache.doris.flink.source.reader.DorisSourceReader       [] - Initialized reader state for split: DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-03:9060_21,be=doris-be-03:9060,tablets=[137602, 137600, 135552, 137606, 137608, 135560, 137614, 137612, 127376, 135568, 127378, 135572, 127382, 137626, 135578, 127384, 127388, 135584, 127394, 135590, 137636, 135592, 137646, 137644, 127406, 137648, 127410, 127412, 135614, 127420, 135612, 137666, 135618, 135620, 135624, 137678, 135630, 127436, 127438, 135634, 135638, 127444, 135636, 127446, 135642, 137694, 135644, 127454, 135650, 127458]
2024-10-25 03:37:34,156 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Starting split fetcher 7
2024-10-25 03:37:34,159 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Handling split change SplitAddition:[[DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-03:9060_21,be=doris-be-03:9060,tablets=[137602, 137600, 135552, 137606, 137608, 135560, 137614, 137612, 127376, 135568, 127378, 135572, 127382, 137626, 135578, 127384, 127388, 135584, 127394, 135590, 137636, 135592, 137646, 137644, 127406, 137648, 127410, 127412, 135614, 127420, 135612, 137666, 135618, 135620, 135624, 137678, 135630, 127436, 127438, 135634, 135638, 127444, 135636, 127446, 135642, 137694, 135644, 127454, 135650, 127458]]]
2024-10-25 03:37:34,159 INFO  org.apache.doris.flink.backend.BackendClient                 [] - OpenScanner success for Doris BE 'Doris BE{host='doris-be-03', port=9060}' with contextId 'd1ecf610-82ac-4cf0-bc0c-5bbcff5e27d8' for tablets '[127542, 127544, 139838, 135740, 139836, 127550, 137794, 139842, 139840, 127554, 139846, 127556, 135754, 137802, 139850, 127562, 135758, 139852, 137810, 139858, 127568, 135760, 137808, 135766, 127576, 139864, 137822, 139870, 127580, 135772, 137820, 127582, 135776, 139872, 137830, 139878, 127588, 127590, 127596, 127598, 139890, 139888, 127602, 127604, 135796, 137844, 137850, 137848, 127612, 135804]'.
2024-10-25 03:37:34,159 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Fetch a new split DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-03:9060_21,be=doris-be-03:9060,tablets=[137602, 137600, 135552, 137606, 137608, 135560, 137614, 137612, 127376, 135568, 127378, 135572, 127382, 137626, 135578, 127384, 127388, 135584, 127394, 135590, 137636, 135592, 137646, 137644, 127406, 137648, 127410, 127412, 135614, 127420, 135612, 137666, 135618, 135620, 135624, 137678, 135630, 127436, 127438, 135634, 135638, 127444, 135636, 127446, 135642, 137694, 135644, 127454, 135650, 127458]
2024-10-25 03:37:34,161 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - create reader for partition: PartitionDefinition{database='DB01', table='nyc_taxi_fare_data', beAddress='doris-be-03:9060', tabletIds=[137602, 137600, 135552, 137606, 137608, 135560, 137614, 137612, 127376, 135568, 127378, 135572, 127382, 137626, 135578, 127384, 127388, 135584, 127394, 135590, 137636, 135592, 137646, 137644, 127406, 137648, 127410, 127412, 135614, 127420, 135612, 137666, 135618, 135620, 135624, 137678, 135630, 127436, 127438, 135634, 135638, 127444, 135636, 127446, 135642, 137694, 135644, 127454, 135650, 127458]}
2024-10-25 03:37:34,162 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Success connect to Doris BE{host='doris-be-03', port=9060}.
2024-10-25 03:37:34,164 INFO  org.apache.doris.flink.source.reader.DorisValueReader        [] - Scan finished, tablets: [127542, 127544, 139838, 135740, 139836, 127550, 137794, 139842, 139840, 127554, 139846, 127556, 135754, 137802, 139850, 127562, 135758, 139852, 137810, 139858, 127568, 135760, 137808, 135766, 127576, 139864, 137822, 139870, 127580, 135772, 137820, 127582, 135776, 139872, 137830, 139878, 127588, 127590, 127596, 127598, 139890, 139888, 127602, 127604, 135796, 137844, 137850, 137848, 127612, 135804], offset: 0
2024-10-25 03:37:34,167 INFO  org.apache.doris.flink.backend.BackendClient                 [] - CloseScanner to Doris BE 'Doris BE{host='doris-be-03', port=9060}' success for contextId d1ecf610-82ac-4cf0-bc0c-5bbcff5e27d8 
2024-10-25 03:37:34,167 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Closed a connection to Doris BE{host='doris-be-03', port=9060}.
2024-10-25 03:37:34,167 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished reading from splits [doris-be-03:9060_23]
2024-10-25 03:37:34,167 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished reading split(s) [doris-be-03:9060_23]
2024-10-25 03:37:34,168 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Closing splitFetcher 4 because it is idle.
2024-10-25 03:37:34,168 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Shutting down split fetcher 4
2024-10-25 03:37:34,168 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split fetcher 4 exited.
2024-10-25 03:37:34,175 INFO  org.apache.doris.flink.backend.BackendClient                 [] - OpenScanner success for Doris BE 'Doris BE{host='doris-be-03', port=9060}' with contextId '72fa026c-d267-4cf9-8491-fe551c5496df' for tablets '[137602, 137600, 135552, 137606, 137608, 135560, 137614, 137612, 127376, 135568, 127378, 135572, 127382, 137626, 135578, 127384, 127388, 135584, 127394, 135590, 137636, 135592, 137646, 137644, 127406, 137648, 127410, 127412, 135614, 127420, 135612, 137666, 135618, 135620, 135624, 137678, 135630, 127436, 127438, 135634, 135638, 127444, 135636, 127446, 135642, 137694, 135644, 127454, 135650, 127458]'.
2024-10-25 03:37:34,184 INFO  org.apache.doris.flink.source.reader.DorisValueReader        [] - Scan finished, tablets: [137602, 137600, 135552, 137606, 137608, 135560, 137614, 137612, 127376, 135568, 127378, 135572, 127382, 137626, 135578, 127384, 127388, 135584, 127394, 135590, 137636, 135592, 137646, 137644, 127406, 137648, 127410, 127412, 135614, 127420, 135612, 137666, 135618, 135620, 135624, 137678, 135630, 127436, 127438, 135634, 135638, 127444, 135636, 127446, 135642, 137694, 135644, 127454, 135650, 127458], offset: 0
2024-10-25 03:37:34,192 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-02:9060_16,be=doris-be-02:9060,tablets=[139142, 136970, 135050, 139146, 136974, 139022, 135054, 137102, 135052, 137106, 139158, 139028, 137108, 139156, 135066, 135064, 137118, 136988, 139040, 136998, 139044, 137000, 137010, 139058, 137008, 137014, 137018, 139066, 137016, 137020, 139078, 139076, 137032, 137038, 139092, 139098, 137048, 139102, 137056, 139104, 139108, 137064, 139118, 139122, 137078, 139126, 139124, 137080, 139128, 139132]]
2024-10-25 03:37:34,192 INFO  org.apache.doris.flink.source.reader.DorisSourceReader       [] - Initialized reader state for split: DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-02:9060_16,be=doris-be-02:9060,tablets=[139142, 136970, 135050, 139146, 136974, 139022, 135054, 137102, 135052, 137106, 139158, 139028, 137108, 139156, 135066, 135064, 137118, 136988, 139040, 136998, 139044, 137000, 137010, 139058, 137008, 137014, 137018, 139066, 137016, 137020, 139078, 139076, 137032, 137038, 139092, 139098, 137048, 139102, 137056, 139104, 139108, 137064, 139118, 139122, 137078, 139126, 139124, 137080, 139128, 139132]
2024-10-25 03:37:34,193 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Starting split fetcher 5
2024-10-25 03:37:34,193 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Handling split change SplitAddition:[[DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-02:9060_16,be=doris-be-02:9060,tablets=[139142, 136970, 135050, 139146, 136974, 139022, 135054, 137102, 135052, 137106, 139158, 139028, 137108, 139156, 135066, 135064, 137118, 136988, 139040, 136998, 139044, 137000, 137010, 139058, 137008, 137014, 137018, 139066, 137016, 137020, 139078, 139076, 137032, 137038, 139092, 139098, 137048, 139102, 137056, 139104, 139108, 137064, 139118, 139122, 137078, 139126, 139124, 137080, 139128, 139132]]]
2024-10-25 03:37:34,193 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Fetch a new split DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-02:9060_16,be=doris-be-02:9060,tablets=[139142, 136970, 135050, 139146, 136974, 139022, 135054, 137102, 135052, 137106, 139158, 139028, 137108, 139156, 135066, 135064, 137118, 136988, 139040, 136998, 139044, 137000, 137010, 139058, 137008, 137014, 137018, 139066, 137016, 137020, 139078, 139076, 137032, 137038, 139092, 139098, 137048, 139102, 137056, 139104, 139108, 137064, 139118, 139122, 137078, 139126, 139124, 137080, 139128, 139132]
2024-10-25 03:37:34,193 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - create reader for partition: PartitionDefinition{database='DB01', table='nyc_taxi_fare_data', beAddress='doris-be-02:9060', tabletIds=[139142, 136970, 135050, 139146, 136974, 139022, 135054, 137102, 135052, 137106, 139158, 139028, 137108, 139156, 135066, 135064, 137118, 136988, 139040, 136998, 139044, 137000, 137010, 139058, 137008, 137014, 137018, 139066, 137016, 137020, 139078, 139076, 137032, 137038, 139092, 139098, 137048, 139102, 137056, 139104, 139108, 137064, 139118, 139122, 137078, 139126, 139124, 137080, 139128, 139132]}
2024-10-25 03:37:34,195 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Success connect to Doris BE{host='doris-be-02', port=9060}.
2024-10-25 03:37:34,197 INFO  org.apache.doris.flink.backend.BackendClient                 [] - CloseScanner to Doris BE 'Doris BE{host='doris-be-03', port=9060}' success for contextId 72fa026c-d267-4cf9-8491-fe551c5496df 
2024-10-25 03:37:34,197 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Closed a connection to Doris BE{host='doris-be-03', port=9060}.
2024-10-25 03:37:34,197 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished reading from splits [doris-be-03:9060_21]
2024-10-25 03:37:34,198 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished reading split(s) [doris-be-03:9060_21]
2024-10-25 03:37:34,198 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Closing splitFetcher 7 because it is idle.
2024-10-25 03:37:34,198 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Shutting down split fetcher 7
2024-10-25 03:37:34,205 INFO  org.apache.doris.flink.backend.BackendClient                 [] - OpenScanner success for Doris BE 'Doris BE{host='doris-be-02', port=9060}' with contextId '456bc908-2c6c-4718-b747-7fd532e19115' for tablets '[139142, 136970, 135050, 139146, 136974, 139022, 135054, 137102, 135052, 137106, 139158, 139028, 137108, 139156, 135066, 135064, 137118, 136988, 139040, 136998, 139044, 137000, 137010, 139058, 137008, 137014, 137018, 139066, 137016, 137020, 139078, 139076, 137032, 137038, 139092, 139098, 137048, 139102, 137056, 139104, 139108, 137064, 139118, 139122, 137078, 139126, 139124, 137080, 139128, 139132]'.
2024-10-25 03:37:34,199 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split fetcher 7 exited.
2024-10-25 03:37:34,218 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-02:9060_13,be=doris-be-02:9060,tablets=[130562, 130566, 130568, 136712, 136716, 130574, 136722, 138774, 138772, 136730, 138778, 136728, 136734, 130588, 136736, 138790, 138694, 138692, 136650, 130504, 136648, 138696, 138700, 138704, 136662, 138710, 136660, 130522, 136670, 138716, 136674, 138722, 136678, 138726, 136680, 138728, 136686, 136684, 138732, 130542, 136690, 130544, 136688, 130546, 138742, 130550, 138746, 138744, 138750, 130558]]
2024-10-25 03:37:34,218 INFO  org.apache.doris.flink.source.reader.DorisSourceReader       [] - Initialized reader state for split: DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-02:9060_13,be=doris-be-02:9060,tablets=[130562, 130566, 130568, 136712, 136716, 130574, 136722, 138774, 138772, 136730, 138778, 136728, 136734, 130588, 136736, 138790, 138694, 138692, 136650, 130504, 136648, 138696, 138700, 138704, 136662, 138710, 136660, 130522, 136670, 138716, 136674, 138722, 136678, 138726, 136680, 138728, 136686, 136684, 138732, 130542, 136690, 130544, 136688, 130546, 138742, 130550, 138746, 138744, 138750, 130558]
2024-10-25 03:37:34,218 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Starting split fetcher 8
2024-10-25 03:37:34,219 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Handling split change SplitAddition:[[DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-02:9060_13,be=doris-be-02:9060,tablets=[130562, 130566, 130568, 136712, 136716, 130574, 136722, 138774, 138772, 136730, 138778, 136728, 136734, 130588, 136736, 138790, 138694, 138692, 136650, 130504, 136648, 138696, 138700, 138704, 136662, 138710, 136660, 130522, 136670, 138716, 136674, 138722, 136678, 138726, 136680, 138728, 136686, 136684, 138732, 130542, 136690, 130544, 136688, 130546, 138742, 130550, 138746, 138744, 138750, 130558]]]
2024-10-25 03:37:34,219 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Fetch a new split DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-02:9060_13,be=doris-be-02:9060,tablets=[130562, 130566, 130568, 136712, 136716, 130574, 136722, 138774, 138772, 136730, 138778, 136728, 136734, 130588, 136736, 138790, 138694, 138692, 136650, 130504, 136648, 138696, 138700, 138704, 136662, 138710, 136660, 130522, 136670, 138716, 136674, 138722, 136678, 138726, 136680, 138728, 136686, 136684, 138732, 130542, 136690, 130544, 136688, 130546, 138742, 130550, 138746, 138744, 138750, 130558]
2024-10-25 03:37:34,219 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - create reader for partition: PartitionDefinition{database='DB01', table='nyc_taxi_fare_data', beAddress='doris-be-02:9060', tabletIds=[130562, 130566, 130568, 136712, 136716, 130574, 136722, 138774, 138772, 136730, 138778, 136728, 136734, 130588, 136736, 138790, 138694, 138692, 136650, 130504, 136648, 138696, 138700, 138704, 136662, 138710, 136660, 130522, 136670, 138716, 136674, 138722, 136678, 138726, 136680, 138728, 136686, 136684, 138732, 130542, 136690, 130544, 136688, 130546, 138742, 130550, 138746, 138744, 138750, 130558]}
2024-10-25 03:37:34,220 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Success connect to Doris BE{host='doris-be-02', port=9060}.
2024-10-25 03:37:34,265 INFO  org.apache.doris.flink.backend.BackendClient                 [] - OpenScanner success for Doris BE 'Doris BE{host='doris-be-02', port=9060}' with contextId '6642825a-d7d9-4eff-a2dd-4573737777c9' for tablets '[130562, 130566, 130568, 136712, 136716, 130574, 136722, 138774, 138772, 136730, 138778, 136728, 136734, 130588, 136736, 138790, 138694, 138692, 136650, 130504, 136648, 138696, 138700, 138704, 136662, 138710, 136660, 130522, 136670, 138716, 136674, 138722, 136678, 138726, 136680, 138728, 136686, 136684, 138732, 130542, 136690, 130544, 136688, 130546, 138742, 130550, 138746, 138744, 138750, 130558]'.
2024-10-25 03:37:34,400 INFO  org.apache.doris.flink.source.reader.DorisValueReader        [] - Scan finished, tablets: [139142, 136970, 135050, 139146, 136974, 139022, 135054, 137102, 135052, 137106, 139158, 139028, 137108, 139156, 135066, 135064, 137118, 136988, 139040, 136998, 139044, 137000, 137010, 139058, 137008, 137014, 137018, 139066, 137016, 137020, 139078, 139076, 137032, 137038, 139092, 139098, 137048, 139102, 137056, 139104, 139108, 137064, 139118, 139122, 137078, 139126, 139124, 137080, 139128, 139132], offset: 13544
2024-10-25 03:37:34,404 INFO  org.apache.doris.flink.backend.BackendClient                 [] - CloseScanner to Doris BE 'Doris BE{host='doris-be-02', port=9060}' success for contextId 456bc908-2c6c-4718-b747-7fd532e19115 
2024-10-25 03:37:34,405 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Closed a connection to Doris BE{host='doris-be-02', port=9060}.
2024-10-25 03:37:34,405 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished reading from splits [doris-be-02:9060_16]
2024-10-25 03:37:34,406 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Closing splitFetcher 5 because it is idle.
2024-10-25 03:37:34,406 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Shutting down split fetcher 5
2024-10-25 03:37:34,406 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split fetcher 5 exited.
2024-10-25 03:37:34,406 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished reading split(s) [doris-be-02:9060_16]
2024-10-25 03:37:34,407 INFO  org.apache.doris.flink.source.reader.DorisValueReader        [] - Scan finished, tablets: [130562, 130566, 130568, 136712, 136716, 130574, 136722, 138774, 138772, 136730, 138778, 136728, 136734, 130588, 136736, 138790, 138694, 138692, 136650, 130504, 136648, 138696, 138700, 138704, 136662, 138710, 136660, 130522, 136670, 138716, 136674, 138722, 136678, 138726, 136680, 138728, 136686, 136684, 138732, 130542, 136690, 130544, 136688, 130546, 138742, 130550, 138746, 138744, 138750, 130558], offset: 9747
2024-10-25 03:37:34,408 INFO  org.apache.doris.flink.backend.BackendClient                 [] - CloseScanner to Doris BE 'Doris BE{host='doris-be-02', port=9060}' success for contextId 6642825a-d7d9-4eff-a2dd-4573737777c9 
2024-10-25 03:37:34,409 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Closed a connection to Doris BE{host='doris-be-02', port=9060}.
2024-10-25 03:37:34,409 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished reading from splits [doris-be-02:9060_13]
2024-10-25 03:37:34,409 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished reading split(s) [doris-be-02:9060_13]
2024-10-25 03:37:34,409 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Closing splitFetcher 8 because it is idle.
2024-10-25 03:37:34,409 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Shutting down split fetcher 8
2024-10-25 03:37:34,409 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split fetcher 8 exited.
2024-10-25 03:37:34,420 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-02:9060_10,be=doris-be-02:9060,tablets=[130176, 136320, 138374, 130190, 136340, 138388, 130198, 138394, 130200, 136348, 138396, 130212, 130214, 130216, 136360, 136368, 136374, 130230, 130232, 138428, 130238, 138434, 138440, 130252, 130254, 138450, 130256, 138448, 130134, 138330, 136286, 136284, 138338, 136288, 136292, 130152, 136296, 130154, 136302, 130156, 130158, 136304, 138352, 130162, 136310, 138356, 130166, 136312, 138360, 130174]]
2024-10-25 03:37:34,421 INFO  org.apache.doris.flink.source.reader.DorisSourceReader       [] - Initialized reader state for split: DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-02:9060_10,be=doris-be-02:9060,tablets=[130176, 136320, 138374, 130190, 136340, 138388, 130198, 138394, 130200, 136348, 138396, 130212, 130214, 130216, 136360, 136368, 136374, 130230, 130232, 138428, 130238, 138434, 138440, 130252, 130254, 138450, 130256, 138448, 130134, 138330, 136286, 136284, 138338, 136288, 136292, 130152, 136296, 130154, 136302, 130156, 130158, 136304, 138352, 130162, 136310, 138356, 130166, 136312, 138360, 130174]
2024-10-25 03:37:34,421 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Starting split fetcher 6
2024-10-25 03:37:34,421 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Handling split change SplitAddition:[[DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-02:9060_10,be=doris-be-02:9060,tablets=[130176, 136320, 138374, 130190, 136340, 138388, 130198, 138394, 130200, 136348, 138396, 130212, 130214, 130216, 136360, 136368, 136374, 130230, 130232, 138428, 130238, 138434, 138440, 130252, 130254, 138450, 130256, 138448, 130134, 138330, 136286, 136284, 138338, 136288, 136292, 130152, 136296, 130154, 136302, 130156, 130158, 136304, 138352, 130162, 136310, 138356, 130166, 136312, 138360, 130174]]]
2024-10-25 03:37:34,421 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Fetch a new split DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-02:9060_10,be=doris-be-02:9060,tablets=[130176, 136320, 138374, 130190, 136340, 138388, 130198, 138394, 130200, 136348, 138396, 130212, 130214, 130216, 136360, 136368, 136374, 130230, 130232, 138428, 130238, 138434, 138440, 130252, 130254, 138450, 130256, 138448, 130134, 138330, 136286, 136284, 138338, 136288, 136292, 130152, 136296, 130154, 136302, 130156, 130158, 136304, 138352, 130162, 136310, 138356, 130166, 136312, 138360, 130174]
2024-10-25 03:37:34,421 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - create reader for partition: PartitionDefinition{database='DB01', table='nyc_taxi_fare_data', beAddress='doris-be-02:9060', tabletIds=[130176, 136320, 138374, 130190, 136340, 138388, 130198, 138394, 130200, 136348, 138396, 130212, 130214, 130216, 136360, 136368, 136374, 130230, 130232, 138428, 130238, 138434, 138440, 130252, 130254, 138450, 130256, 138448, 130134, 138330, 136286, 136284, 138338, 136288, 136292, 130152, 136296, 130154, 136302, 130156, 130158, 136304, 138352, 130162, 136310, 138356, 130166, 136312, 138360, 130174]}
2024-10-25 03:37:34,422 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Success connect to Doris BE{host='doris-be-02', port=9060}.
2024-10-25 03:37:34,423 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-02:9060_9,be=doris-be-02:9060,tablets=[138242, 138244, 138248, 138254, 138252, 140310, 136212, 140314, 138266, 130072, 140318, 138270, 140316, 136220, 140326, 140324, 136228, 140330, 130088, 140328, 138286, 130094, 140338, 130096, 138288, 136244, 130102, 140346, 138296, 136256, 138304, 130114, 138318, 138322, 130128, 130130, 136278, 136174, 138220, 138226, 140272, 140278, 138230, 138228, 136186, 138232, 136190, 140286, 138238, 136188]]
2024-10-25 03:37:34,423 INFO  org.apache.doris.flink.source.reader.DorisSourceReader       [] - Initialized reader state for split: DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-02:9060_9,be=doris-be-02:9060,tablets=[138242, 138244, 138248, 138254, 138252, 140310, 136212, 140314, 138266, 130072, 140318, 138270, 140316, 136220, 140326, 140324, 136228, 140330, 130088, 140328, 138286, 130094, 140338, 130096, 138288, 136244, 130102, 140346, 138296, 136256, 138304, 130114, 138318, 138322, 130128, 130130, 136278, 136174, 138220, 138226, 140272, 140278, 138230, 138228, 136186, 138232, 136190, 140286, 138238, 136188]
2024-10-25 03:37:34,424 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Starting split fetcher 9
2024-10-25 03:37:34,424 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Handling split change SplitAddition:[[DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-02:9060_9,be=doris-be-02:9060,tablets=[138242, 138244, 138248, 138254, 138252, 140310, 136212, 140314, 138266, 130072, 140318, 138270, 140316, 136220, 140326, 140324, 136228, 140330, 130088, 140328, 138286, 130094, 140338, 130096, 138288, 136244, 130102, 140346, 138296, 136256, 138304, 130114, 138318, 138322, 130128, 130130, 136278, 136174, 138220, 138226, 140272, 140278, 138230, 138228, 136186, 138232, 136190, 140286, 138238, 136188]]]
2024-10-25 03:37:34,424 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Fetch a new split DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-02:9060_9,be=doris-be-02:9060,tablets=[138242, 138244, 138248, 138254, 138252, 140310, 136212, 140314, 138266, 130072, 140318, 138270, 140316, 136220, 140326, 140324, 136228, 140330, 130088, 140328, 138286, 130094, 140338, 130096, 138288, 136244, 130102, 140346, 138296, 136256, 138304, 130114, 138318, 138322, 130128, 130130, 136278, 136174, 138220, 138226, 140272, 140278, 138230, 138228, 136186, 138232, 136190, 140286, 138238, 136188]
2024-10-25 03:37:34,424 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - create reader for partition: PartitionDefinition{database='DB01', table='nyc_taxi_fare_data', beAddress='doris-be-02:9060', tabletIds=[138242, 138244, 138248, 138254, 138252, 140310, 136212, 140314, 138266, 130072, 140318, 138270, 140316, 136220, 140326, 140324, 136228, 140330, 130088, 140328, 138286, 130094, 140338, 130096, 138288, 136244, 130102, 140346, 138296, 136256, 138304, 130114, 138318, 138322, 130128, 130130, 136278, 136174, 138220, 138226, 140272, 140278, 138230, 138228, 136186, 138232, 136190, 140286, 138238, 136188]}
2024-10-25 03:37:34,425 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Success connect to Doris BE{host='doris-be-02', port=9060}.
2024-10-25 03:37:34,440 INFO  org.apache.doris.flink.backend.BackendClient                 [] - OpenScanner success for Doris BE 'Doris BE{host='doris-be-02', port=9060}' with contextId '179928a0-3dc9-41ec-885c-6c8fbcbae052' for tablets '[138242, 138244, 138248, 138254, 138252, 140310, 136212, 140314, 138266, 130072, 140318, 138270, 140316, 136220, 140326, 140324, 136228, 140330, 130088, 140328, 138286, 130094, 140338, 130096, 138288, 136244, 130102, 140346, 138296, 136256, 138304, 130114, 138318, 138322, 130128, 130130, 136278, 136174, 138220, 138226, 140272, 140278, 138230, 138228, 136186, 138232, 136190, 140286, 138238, 136188]'.
2024-10-25 03:37:34,459 INFO  org.apache.doris.flink.backend.BackendClient                 [] - OpenScanner success for Doris BE 'Doris BE{host='doris-be-02', port=9060}' with contextId 'afa713d8-01e2-4cf3-b4b9-ce1c3d18504c' for tablets '[130176, 136320, 138374, 130190, 136340, 138388, 130198, 138394, 130200, 136348, 138396, 130212, 130214, 130216, 136360, 136368, 136374, 130230, 130232, 138428, 130238, 138434, 138440, 130252, 130254, 138450, 130256, 138448, 130134, 138330, 136286, 136284, 138338, 136288, 136292, 130152, 136296, 130154, 136302, 130156, 130158, 136304, 138352, 130162, 136310, 138356, 130166, 136312, 138360, 130174]'.
2024-10-25 03:37:34,462 INFO  org.apache.doris.flink.source.reader.DorisValueReader        [] - Scan finished, tablets: [138242, 138244, 138248, 138254, 138252, 140310, 136212, 140314, 138266, 130072, 140318, 138270, 140316, 136220, 140326, 140324, 136228, 140330, 130088, 140328, 138286, 130094, 140338, 130096, 138288, 136244, 130102, 140346, 138296, 136256, 138304, 130114, 138318, 138322, 130128, 130130, 136278, 136174, 138220, 138226, 140272, 140278, 138230, 138228, 136186, 138232, 136190, 140286, 138238, 136188], offset: 0
2024-10-25 03:37:34,472 INFO  org.apache.doris.flink.source.reader.DorisValueReader        [] - Scan finished, tablets: [130176, 136320, 138374, 130190, 136340, 138388, 130198, 138394, 130200, 136348, 138396, 130212, 130214, 130216, 136360, 136368, 136374, 130230, 130232, 138428, 130238, 138434, 138440, 130252, 130254, 138450, 130256, 138448, 130134, 138330, 136286, 136284, 138338, 136288, 136292, 130152, 136296, 130154, 136302, 130156, 130158, 136304, 138352, 130162, 136310, 138356, 130166, 136312, 138360, 130174], offset: 3
2024-10-25 03:37:34,472 INFO  org.apache.doris.flink.backend.BackendClient                 [] - CloseScanner to Doris BE 'Doris BE{host='doris-be-02', port=9060}' success for contextId 179928a0-3dc9-41ec-885c-6c8fbcbae052 
2024-10-25 03:37:34,472 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Closed a connection to Doris BE{host='doris-be-02', port=9060}.
2024-10-25 03:37:34,473 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished reading from splits [doris-be-02:9060_9]
2024-10-25 03:37:34,473 INFO  org.apache.doris.flink.backend.BackendClient                 [] - CloseScanner to Doris BE 'Doris BE{host='doris-be-02', port=9060}' success for contextId afa713d8-01e2-4cf3-b4b9-ce1c3d18504c 
2024-10-25 03:37:34,473 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Closed a connection to Doris BE{host='doris-be-02', port=9060}.
2024-10-25 03:37:34,473 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished reading from splits [doris-be-02:9060_10]
2024-10-25 03:37:34,473 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished reading split(s) [doris-be-02:9060_9]
2024-10-25 03:37:34,474 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Closing splitFetcher 9 because it is idle.
2024-10-25 03:37:34,474 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Shutting down split fetcher 9
2024-10-25 03:37:34,474 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split fetcher 9 exited.
2024-10-25 03:37:34,474 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Closing splitFetcher 6 because it is idle.
2024-10-25 03:37:34,474 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Shutting down split fetcher 6
2024-10-25 03:37:34,474 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split fetcher 6 exited.
2024-10-25 03:37:34,474 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished reading split(s) [doris-be-02:9060_10]
2024-10-25 03:37:34,484 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-02:9060_4,be=doris-be-02:9060,tablets=[137730, 137734, 127498, 135692, 137746, 127506, 137750, 127510, 135706, 127514, 137758, 127518, 127520, 135712, 127522, 127524, 135716, 127528, 137768, 127530, 137774, 135724, 127534, 127536, 137784, 139832, 127546, 135742, 137790, 127548, 127552, 137800, 139848, 137806, 127570, 137814, 139862, 127572, 135764, 139860, 139866, 137826, 127586, 135656, 135660, 135666, 135664, 135674, 127480, 135676]]
2024-10-25 03:37:34,484 INFO  org.apache.doris.flink.source.reader.DorisSourceReader       [] - Initialized reader state for split: DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-02:9060_4,be=doris-be-02:9060,tablets=[137730, 137734, 127498, 135692, 137746, 127506, 137750, 127510, 135706, 127514, 137758, 127518, 127520, 135712, 127522, 127524, 135716, 127528, 137768, 127530, 137774, 135724, 127534, 127536, 137784, 139832, 127546, 135742, 137790, 127548, 127552, 137800, 139848, 137806, 127570, 137814, 139862, 127572, 135764, 139860, 139866, 137826, 127586, 135656, 135660, 135666, 135664, 135674, 127480, 135676]
2024-10-25 03:37:34,490 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Starting split fetcher 10
2024-10-25 03:37:34,491 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Handling split change SplitAddition:[[DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-02:9060_4,be=doris-be-02:9060,tablets=[137730, 137734, 127498, 135692, 137746, 127506, 137750, 127510, 135706, 127514, 137758, 127518, 127520, 135712, 127522, 127524, 135716, 127528, 137768, 127530, 137774, 135724, 127534, 127536, 137784, 139832, 127546, 135742, 137790, 127548, 127552, 137800, 139848, 137806, 127570, 137814, 139862, 127572, 135764, 139860, 139866, 137826, 127586, 135656, 135660, 135666, 135664, 135674, 127480, 135676]]]
2024-10-25 03:37:34,491 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Fetch a new split DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-02:9060_4,be=doris-be-02:9060,tablets=[137730, 137734, 127498, 135692, 137746, 127506, 137750, 127510, 135706, 127514, 137758, 127518, 127520, 135712, 127522, 127524, 135716, 127528, 137768, 127530, 137774, 135724, 127534, 127536, 137784, 139832, 127546, 135742, 137790, 127548, 127552, 137800, 139848, 137806, 127570, 137814, 139862, 127572, 135764, 139860, 139866, 137826, 127586, 135656, 135660, 135666, 135664, 135674, 127480, 135676]
2024-10-25 03:37:34,491 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - create reader for partition: PartitionDefinition{database='DB01', table='nyc_taxi_fare_data', beAddress='doris-be-02:9060', tabletIds=[137730, 137734, 127498, 135692, 137746, 127506, 137750, 127510, 135706, 127514, 137758, 127518, 127520, 135712, 127522, 127524, 135716, 127528, 137768, 127530, 137774, 135724, 127534, 127536, 137784, 139832, 127546, 135742, 137790, 127548, 127552, 137800, 139848, 137806, 127570, 137814, 139862, 127572, 135764, 139860, 139866, 137826, 127586, 135656, 135660, 135666, 135664, 135674, 127480, 135676]}
2024-10-25 03:37:34,492 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Success connect to Doris BE{host='doris-be-02', port=9060}.
2024-10-25 03:37:34,496 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-02:9060_2,be=doris-be-02:9060,tablets=[135426, 137474, 127234, 137478, 127236, 137476, 127238, 135432, 137486, 127244, 137484, 127246, 127248, 127252, 135450, 135448, 135454, 127260, 137500, 135452, 127262, 135462, 127272, 137512, 137522, 127280, 127282, 127286, 137530, 137528, 127290, 137534, 135490, 135488, 127300, 127302, 127304, 135502, 135504, 127318, 127324, 137564, 127326, 127330, 135526, 127332, 137572, 127336, 127340, 137468]]
2024-10-25 03:37:34,497 INFO  org.apache.doris.flink.source.reader.DorisSourceReader       [] - Initialized reader state for split: DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-02:9060_2,be=doris-be-02:9060,tablets=[135426, 137474, 127234, 137478, 127236, 137476, 127238, 135432, 137486, 127244, 137484, 127246, 127248, 127252, 135450, 135448, 135454, 127260, 137500, 135452, 127262, 135462, 127272, 137512, 137522, 127280, 127282, 127286, 137530, 137528, 127290, 137534, 135490, 135488, 127300, 127302, 127304, 135502, 135504, 127318, 127324, 137564, 127326, 127330, 135526, 127332, 137572, 127336, 127340, 137468]
2024-10-25 03:37:34,497 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Starting split fetcher 7
2024-10-25 03:37:34,497 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Handling split change SplitAddition:[[DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-02:9060_2,be=doris-be-02:9060,tablets=[135426, 137474, 127234, 137478, 127236, 137476, 127238, 135432, 137486, 127244, 137484, 127246, 127248, 127252, 135450, 135448, 135454, 127260, 137500, 135452, 127262, 135462, 127272, 137512, 137522, 127280, 127282, 127286, 137530, 137528, 127290, 137534, 135490, 135488, 127300, 127302, 127304, 135502, 135504, 127318, 127324, 137564, 127326, 127330, 135526, 127332, 137572, 127336, 127340, 137468]]]
2024-10-25 03:37:34,497 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - Fetch a new split DorisSourceSplit: DB01.nyc_taxi_fare_data,id=doris-be-02:9060_2,be=doris-be-02:9060,tablets=[135426, 137474, 127234, 137478, 127236, 137476, 127238, 135432, 137486, 127244, 137484, 127246, 127248, 127252, 135450, 135448, 135454, 127260, 137500, 135452, 127262, 135462, 127272, 137512, 137522, 127280, 127282, 127286, 137530, 137528, 127290, 137534, 135490, 135488, 127300, 127302, 127304, 135502, 135504, 127318, 127324, 137564, 127326, 127330, 135526, 127332, 137572, 127336, 127340, 137468]
2024-10-25 03:37:34,497 INFO  org.apache.doris.flink.source.reader.DorisSourceSplitReader  [] - create reader for partition: PartitionDefinition{database='DB01', table='nyc_taxi_fare_data', beAddress='doris-be-02:9060', tabletIds=[135426, 137474, 127234, 137478, 127236, 137476, 127238, 135432, 137486, 127244, 137484, 127246, 127248, 127252, 135450, 135448, 135454, 127260, 137500, 135452, 127262, 135462, 127272, 137512, 137522, 127280, 127282, 127286, 137530, 137528, 127290, 137534, 135490, 135488, 127300, 127302, 127304, 135502, 135504, 127318, 127324, 137564, 127326, 127330, 135526, 127332, 137572, 127336, 127340, 137468]}
2024-10-25 03:37:34,498 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Success connect to Doris BE{host='doris-be-02', port=9060}.
2024-10-25 03:37:34,505 INFO  org.apache.doris.flink.backend.BackendClient                 [] - OpenScanner success for Doris BE 'Doris BE{host='doris-be-02', port=9060}' with contextId 'afb0e34d-6378-491a-96a2-d786e95c5492' for tablets '[137730, 137734, 127498, 135692, 137746, 127506, 137750, 127510, 135706, 127514, 137758, 127518, 127520, 135712, 127522, 127524, 135716, 127528, 137768, 127530, 137774, 135724, 127534, 127536, 137784, 139832, 127546, 135742, 137790, 127548, 127552, 137800, 139848, 137806, 127570, 137814, 139862, 127572, 135764, 139860, 139866, 137826, 127586, 135656, 135660, 135666, 135664, 135674, 127480, 135676]'.
2024-10-25 03:37:34,520 INFO  org.apache.doris.flink.backend.BackendClient                 [] - OpenScanner success for Doris BE 'Doris BE{host='doris-be-02', port=9060}' with contextId 'bc833894-5185-4648-b9a0-e603a5529d46' for tablets '[135426, 137474, 127234, 137478, 127236, 137476, 127238, 135432, 137486, 127244, 137484, 127246, 127248, 127252, 135450, 135448, 135454, 127260, 137500, 135452, 127262, 135462, 127272, 137512, 137522, 127280, 127282, 127286, 137530, 137528, 127290, 137534, 135490, 135488, 127300, 127302, 127304, 135502, 135504, 127318, 127324, 137564, 127326, 127330, 135526, 127332, 137572, 127336, 127340, 137468]'.
2024-10-25 03:37:34,533 INFO  org.apache.doris.flink.source.reader.DorisValueReader        [] - Scan finished, tablets: [137730, 137734, 127498, 135692, 137746, 127506, 137750, 127510, 135706, 127514, 137758, 127518, 127520, 135712, 127522, 127524, 135716, 127528, 137768, 127530, 137774, 135724, 127534, 127536, 137784, 139832, 127546, 135742, 137790, 127548, 127552, 137800, 139848, 137806, 127570, 137814, 139862, 127572, 135764, 139860, 139866, 137826, 127586, 135656, 135660, 135666, 135664, 135674, 127480, 135676], offset: 0
2024-10-25 03:37:34,542 INFO  org.apache.doris.flink.source.reader.DorisValueReader        [] - Scan finished, tablets: [135426, 137474, 127234, 137478, 127236, 137476, 127238, 135432, 137486, 127244, 137484, 127246, 127248, 127252, 135450, 135448, 135454, 127260, 137500, 135452, 127262, 135462, 127272, 137512, 137522, 127280, 127282, 127286, 137530, 137528, 127290, 137534, 135490, 135488, 127300, 127302, 127304, 135502, 135504, 127318, 127324, 137564, 127326, 127330, 135526, 127332, 137572, 127336, 127340, 137468], offset: 3
2024-10-25 03:37:34,542 INFO  org.apache.doris.flink.backend.BackendClient                 [] - CloseScanner to Doris BE 'Doris BE{host='doris-be-02', port=9060}' success for contextId afb0e34d-6378-491a-96a2-d786e95c5492 
2024-10-25 03:37:34,542 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Closed a connection to Doris BE{host='doris-be-02', port=9060}.
2024-10-25 03:37:34,543 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished reading from splits [doris-be-02:9060_4]
2024-10-25 03:37:34,543 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished reading split(s) [doris-be-02:9060_4]
2024-10-25 03:37:34,543 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Closing splitFetcher 10 because it is idle.
2024-10-25 03:37:34,543 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Shutting down split fetcher 10
2024-10-25 03:37:34,543 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split fetcher 10 exited.
2024-10-25 03:37:34,543 INFO  org.apache.doris.flink.backend.BackendClient                 [] - CloseScanner to Doris BE 'Doris BE{host='doris-be-02', port=9060}' success for contextId bc833894-5185-4648-b9a0-e603a5529d46 
2024-10-25 03:37:34,544 INFO  org.apache.doris.flink.backend.BackendClient                 [] - Closed a connection to Doris BE{host='doris-be-02', port=9060}.
2024-10-25 03:37:34,544 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished reading from splits [doris-be-02:9060_2]
2024-10-25 03:37:34,544 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished reading split(s) [doris-be-02:9060_2]
2024-10-25 03:37:34,544 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Closing splitFetcher 7 because it is idle.
2024-10-25 03:37:34,544 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Shutting down split fetcher 7
2024-10-25 03:37:34,544 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split fetcher 7 exited.
2024-10-25 03:37:34,554 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Reader received NoMoreSplits event.
2024-10-25 03:37:34,554 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Reader received NoMoreSplits event.
2024-10-25 03:37:34,593 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing Source Reader.
2024-10-25 03:37:34,595 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing Source Reader.
2024-10-25 03:37:34,601 INFO  org.apache.flink.runtime.io.network.partition.ResultPartition [] - New partitioned file produced: PartitionedFile{numRegions=2, numSubpartitions=4, dataFilePath=/tmp/flink-netty-shuffle-54b69fca-e743-475c-a233-12cf31b7b9c1/136653da5eee3234063b8eaa136c1160.channel.shuffle.data, indexFilePath=/tmp/flink-netty-shuffle-54b69fca-e743-475c-a233-12cf31b7b9c1/136653da5eee3234063b8eaa136c1160.channel.shuffle.index, dataFileSize=676290, indexFileSize=128, numBuffers=110, indexDataCached=true}.
2024-10-25 03:37:34,601 INFO  org.apache.flink.runtime.io.network.partition.ResultPartition [] - New partitioned file produced: PartitionedFile{numRegions=2, numSubpartitions=4, dataFilePath=/tmp/flink-netty-shuffle-54b69fca-e743-475c-a233-12cf31b7b9c1/36751b430e2297f913ae3a38cfa3e980.channel.shuffle.data, indexFilePath=/tmp/flink-netty-shuffle-54b69fca-e743-475c-a233-12cf31b7b9c1/36751b430e2297f913ae3a38cfa3e980.channel.shuffle.index, dataFileSize=391965, indexFileSize=128, numBuffers=62, indexDataCached=true}.
2024-10-25 03:37:34,601 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: doris-source (1/6)#0 (0f33b5b7444e11245dcb129a5cb9e6b6_bc764cd8ddf7a0cff126f51c16239658_0_0) switched from RUNNING to FINISHED.
2024-10-25 03:37:34,601 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: doris-source (2/6)#0 (0f33b5b7444e11245dcb129a5cb9e6b6_bc764cd8ddf7a0cff126f51c16239658_1_0) switched from RUNNING to FINISHED.
2024-10-25 03:37:34,601 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for Source: doris-source (1/6)#0 (0f33b5b7444e11245dcb129a5cb9e6b6_bc764cd8ddf7a0cff126f51c16239658_0_0).
2024-10-25 03:37:34,601 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for Source: doris-source (2/6)#0 

@JNSimba
Copy link
Member

JNSimba commented Oct 25, 2024

This has been fixed in this PR apache/doris#42421, you can try it;
Or you can use arrow-flight to read, you can refer to this PR #465

@Misaki030112
Copy link
Author

This has been fixed in this PR apache/doris#42421, you can try it; Or you can use arrow-flight to read, you can refer to this PR #465

Thank you very much, I will close this issue later

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants