Skip to content

Commit

Permalink
[FLINK-37205][python] Correct the state cache behavior during bump be…
Browse files Browse the repository at this point in the history
…am version (apache#26058)
  • Loading branch information
dianfu authored Feb 6, 2025
1 parent aef8c86 commit a498cf2
Showing 1 changed file with 34 additions and 2 deletions.
36 changes: 34 additions & 2 deletions flink-python/pyflink/fn_execution/beam/beam_sdk_worker_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,53 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import re
import sys

# force to register the operations to SDK Harness
from apache_beam.options.pipeline_options import DebugOptions, PipelineOptions

import pyflink.fn_execution.beam.beam_operations # noqa # pylint: disable=unused-import

# force to register the coders to SDK Harness
import pyflink.fn_execution.beam.beam_coders # noqa # pylint: disable=unused-import

import apache_beam.runners.worker.sdk_worker_main
import apache_beam

# disable bundle processor shutdown
from apache_beam.runners.worker import sdk_worker
from apache_beam.runners.worker import sdk_worker, sdk_worker_main, statecache
sdk_worker.DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S = 86400 * 30


# Currently PyFlink only support count-based state cache strategy
def get_deep_size(*objs):
return 1


def get_state_cache_size(options):
"""
Return the maximum size of state cache in count.
"""
if isinstance(options, PipelineOptions):
experiments = options.view_as(DebugOptions).experiments or []
else:
experiments = options

for experiment in experiments:
# There should only be 1 match so returning from the loop
if re.match(r'state_cache_size=', experiment):
return int(
re.match(r'state_cache_size=(?P<state_cache_size>.*)',
experiment).group('state_cache_size'))
return 0


statecache.get_deep_size = get_deep_size
sdk_worker_main._get_state_cache_size = get_state_cache_size
# since Beam 2.52.0, _get_state_cache_size is renamed to _get_state_cache_size_bytes
sdk_worker_main._get_state_cache_size_bytes = get_state_cache_size


def print_to_logging(logging_func, msg, *args, **kwargs):
if msg != '\n':
logging_func(msg, *args, **kwargs)
Expand Down

0 comments on commit a498cf2

Please sign in to comment.