From 8fa237bda31844a18f680a1b884464a3ac983ec2 Mon Sep 17 00:00:00 2001 From: Michael Reneer Date: Thu, 12 Dec 2024 10:55:40 -0800 Subject: [PATCH] Remove `tff.program.X`, use `federated_language.program` instead. For each: * FederatedDataSource * FederatedDataSourceIterator * check_in_federated_context * ComputationArg * contains_only_server_placed_data * FederatedContext * LoggingReleaseManager * MemoryReleaseManager * ProgramStateExistsError * ProgramStateManager * ProgramStateNotFoundError * ProgramStateStructure * ProgramStateValue * DelayedReleaseManager * FilteringReleaseManager * GroupingReleaseManager * NotFilterableError * PeriodicReleaseManager * ReleasableStructure * ReleasableValue * ReleaseManager * MaterializableStructure * MaterializableTypeSignature * MaterializableValue * MaterializableValueReference * materialize_value * MaterializedStructure * MaterializedValue See: * https://github.com/google-parfait/federated-language/commit/2ec477bcd8ecfa7e5422b806450f101d1eed7ecb * https://github.com/google-parfait/tensorflow-federated/commit/0ae85a7839a5a67e52bf1672da9b54111d650f63 for more information. PiperOrigin-RevId: 705561178 --- RELEASE.md | 42 ++++++++-- docs/program/federated_program.md | 10 +-- docs/program/guide.md | 14 ++-- .../learning/federated_program/vizier/BUILD | 6 +- .../federated_program/vizier/data_sources.py | 17 ++-- .../federated_program/vizier/program.py | 25 ++++-- examples/program/BUILD | 7 +- examples/program/program.py | 31 +++++--- examples/program/program_logic.py | 77 +++++++++++-------- examples/program/program_logic_test.py | 29 ++++--- .../programs/evaluation_program_logic.py | 40 +++++----- .../programs/training_program_logic.py | 23 +++--- .../learning/programs/vizier_program_logic.py | 12 +-- tensorflow_federated/python/program/BUILD | 1 - .../python/program/__init__.py | 44 ----------- .../python/program/client_id_data_source.py | 8 +- .../python/program/dataset_data_source.py | 9 ++- .../program/file_program_state_manager.py | 10 ++- .../python/program/file_release_manager.py | 15 ++-- .../python/program/native_platform.py | 12 +-- .../python/program/program_test_utils.py | 2 +- .../python/program/structure_utils.py | 3 +- .../program/tensorboard_release_manager.py | 4 +- .../python/simulation/training_loop.py | 9 ++- 24 files changed, 248 insertions(+), 202 deletions(-) diff --git a/RELEASE.md b/RELEASE.md index 0b7cb0a837..6de0cde56e 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -8,12 +8,6 @@ and this project adheres to ## Unreleased -* Updated `MeasuredProcessOutput` to be a `NamedTuple`. -* Added number of round retries to training metrics in - `tff.learning.programs.train_model`. -* Fix buffer overrun in `AggVectorIterator` when passing in an empty - `TensorData`. - ### Added * `tff.StructType.items()`, this API makes it easier to iterate over @@ -55,6 +49,12 @@ and this project adheres to of the state of a `DPTensorAggregatorBundle`. * `DPTensorAggregatorBundle::TakeOutputs` calls the inner aggregator's `ReportWithEpsilonAndDelta` methods and stitches the outputs together. +* Number of round retries to training metrics in + `tff.learning.programs.train_model`. + +### Fixed + +* Buffer overrun in `AggVectorIterator` when passing in an empty `TensorData`. ### Changed @@ -68,10 +68,40 @@ and this project adheres to check that the input is valid before passing to the aggregators it contains. * Moved the tests of compatibility from `DPQuantileAggregator::MergeWith` to `DPQuantileAggregator::IsCompatible`. +* Updated `MeasuredProcessOutput` to be a `NamedTuple`. ### Removed * `tff.types.tensorflow_to_type`, this function is no longer used. +* `tff.program.X`, use `federated_language.program` instead, for each: + * `FederatedDataSource` + * `FederatedDataSourceIterator` + * `check_in_federated_context` + * `ComputationArg` + * `contains_only_server_placed_data` + * `FederatedContext` + * `LoggingReleaseManager` + * `MemoryReleaseManager` + * `ProgramStateExistsError` + * `ProgramStateManager` + * `ProgramStateNotFoundError` + * `ProgramStateStructure` + * `ProgramStateValue` + * `DelayedReleaseManager` + * `FilteringReleaseManager` + * `GroupingReleaseManager` + * `NotFilterableError` + * `PeriodicReleaseManager` + * `ReleasableStructure` + * `ReleasableValue` + * `ReleaseManager` + * `MaterializableStructure` + * `MaterializableTypeSignature` + * `MaterializableValue` + * `MaterializableValueReference` + * `materialize_value` + * `MaterializedStructure` + * `MaterializedValue` ## Release 0.88.0 diff --git a/docs/program/federated_program.md b/docs/program/federated_program.md index 3387990713..b44f09df62 100644 --- a/docs/program/federated_program.md +++ b/docs/program/federated_program.md @@ -85,8 +85,8 @@ def main() -> None: # Construct the platform-agnostic components. summary_dir = os.path.join(FLAGS.output_dir, 'summary') - metrics_manager = tff.program.GroupingReleaseManager([ - tff.program.LoggingReleaseManager(), + metrics_manager = federated_language.program.GroupingReleaseManager([ + federated_language.program.LoggingReleaseManager(), tff.program.TensorBoardReleaseManager(summary_dir), ]) program_state_dir = os.path.join(..., 'program_state') @@ -177,11 +177,11 @@ For example: async def program_logic( initialize: tff.Computation, train: tff.Computation, - data_source: tff.program.FederatedDataSource, + data_source: federated_language.program.FederatedDataSource, total_rounds: int, num_clients: int, - metrics_manager: tff.program.ReleaseManager[ - tff.program.ReleasableStructure, int + metrics_manager: federated_language.program.ReleaseManager[ + federated_language.program.ReleasableStructure, int ], ) -> None: state = initialize() diff --git a/docs/program/guide.md b/docs/program/guide.md index 8289958709..69224b354f 100644 --- a/docs/program/guide.md +++ b/docs/program/guide.md @@ -27,7 +27,7 @@ program logic that has a type signature. ```python {.good} async def program_logic( train: tff.Computation, - data_source: tff.program.FederatedDataSource, + data_source: federated_language.program.FederatedDataSource, ... ) -> None: """Trains a federated model for some number of rounds. @@ -53,14 +53,14 @@ the program logic that has a type signature. ```python {.good} def _check_program_logic_type_signatures( train: tff.Computation, - data_source: tff.program.FederatedDataSource, + data_source: federated_language.program.FederatedDataSource, ... ) -> None: ... async def program_logic( train: tff.Computation, - data_source: tff.program.FederatedDataSource, + data_source: federated_language.program.FederatedDataSource, ... ) -> None: _check_program_logic_type_signatures( @@ -73,13 +73,13 @@ async def program_logic( ### Type Annotations **Do** provide a well defined Python type for each -[`tff.program.ReleaseManager`](https://www.tensorflow.org/federated/api_docs/python/tff/program/ReleaseManager) +[`federated_language.program.ReleaseManager`](https://www.tensorflow.org/federated/api_docs/python/tff/program/ReleaseManager) parameter supplied to the program logic. ```python {.good} async def program_logic( metrics_manager: Optional[ - tff.program.ReleaseManager[tff.program.ReleasableStructure, int] + federated_language.program.ReleaseManager[federated_language.program.ReleasableStructure, int] ] = None, ... ) -> None: @@ -98,7 +98,7 @@ async def program_logic( ```python {.bad} async def program_logic( - metrics_manager: Optional[tff.program.ReleaseManager] = None, + metrics_manager: Optional[federated_language.program.ReleaseManager] = None, ... ) -> None: ... @@ -148,7 +148,7 @@ async def program_loic(...) -> None: ```python {.good} async def program_logic( - metrics_manager: Optional[tff.program.ReleaseManager] = None, + metrics_manager: Optional[federated_language.program.ReleaseManager] = None, ... ) -> None: """Trains a federated model for some number of rounds. diff --git a/examples/learning/federated_program/vizier/BUILD b/examples/learning/federated_program/vizier/BUILD index 517888071a..74dc8e5097 100644 --- a/examples/learning/federated_program/vizier/BUILD +++ b/examples/learning/federated_program/vizier/BUILD @@ -3,7 +3,10 @@ load("@rules_python//python:defs.bzl", "py_binary", "py_library", "py_test") py_library( name = "data_sources", srcs = ["data_sources.py"], - deps = ["//tensorflow_federated"], + deps = [ + "//tensorflow_federated", + "@federated_language//federated_language", + ], ) py_library( @@ -23,6 +26,7 @@ py_binary( ":learning_process", ":vizier_service", "//tensorflow_federated", + "@federated_language//federated_language", ], ) diff --git a/examples/learning/federated_program/vizier/data_sources.py b/examples/learning/federated_program/vizier/data_sources.py index bda5cf03ad..2501d11b7e 100644 --- a/examples/learning/federated_program/vizier/data_sources.py +++ b/examples/learning/federated_program/vizier/data_sources.py @@ -11,20 +11,19 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""The `tff.program.FederatedDataSource`s for this federated program.""" +"""The `federated_language.program.FederatedDataSource`s for this federated program.""" +import federated_language import tensorflow as tf import tensorflow_federated as tff -def create_data_sources() -> ( - tuple[ - tff.program.FederatedDataSource, - tff.program.FederatedDataSource, - tf.TensorSpec, - ] -): - """Creates the `tff.program.FederatedDataSource`s for this program. +def create_data_sources() -> tuple[ + federated_language.program.FederatedDataSource, + federated_language.program.FederatedDataSource, + tf.TensorSpec, +]: + """Creates the `federated_language.program.FederatedDataSource`s for this program. Returns: A `tuple` containing the train data source, evaluation data source, and the diff --git a/examples/learning/federated_program/vizier/program.py b/examples/learning/federated_program/vizier/program.py index ee68d88f9f..22d608d869 100644 --- a/examples/learning/federated_program/vizier/program.py +++ b/examples/learning/federated_program/vizier/program.py @@ -32,6 +32,7 @@ from absl import app from absl import flags from absl import logging +import federated_language import tensorflow_federated as tff from vizier import pyvizier from vizier.client import client_abc @@ -125,19 +126,23 @@ def _train_process_factory( def _model_output_manager_factory( trial: client_abc.TrialInterface, - ) -> tff.program.ReleaseManager[tff.program.ReleasableStructure, str]: + ) -> federated_language.program.ReleaseManager[ + federated_language.program.ReleasableStructure, str + ]: del trial # Unused. - return tff.program.LoggingReleaseManager() + return federated_language.program.LoggingReleaseManager() def _metrics_manager_factory( trial: client_abc.TrialInterface, - ) -> tff.program.ReleaseManager[tff.program.ReleasableStructure, int]: + ) -> federated_language.program.ReleaseManager[ + federated_language.program.ReleasableStructure, int + ]: del trial # Unused. - return tff.program.LoggingReleaseManager() + return federated_language.program.LoggingReleaseManager() def _program_state_manager_factory( trial: client_abc.TrialInterface, - ) -> tff.program.ProgramStateManager: + ) -> federated_language.program.ProgramStateManager: trial_name = f'trial_{trial.id}' root_dir = os.path.join(_OUTPUT_DIR.value, experiment_name, trial_name) return tff.program.FileProgramStateManager(root_dir) @@ -145,7 +150,9 @@ def _program_state_manager_factory( def _evaluation_manager_factory( trial: client_abc.TrialInterface, ) -> tff.learning.programs.EvaluationManager: - aggregated_metrics_manager = tff.program.LoggingReleaseManager() + aggregated_metrics_manager = ( + federated_language.program.LoggingReleaseManager() + ) def _create_state_manager_fn( name: str, @@ -164,11 +171,13 @@ def _create_process_fn( ) -> tuple[ tff.learning.templates.LearningProcess, Optional[ - tff.program.ReleaseManager[tff.program.ReleasableStructure, int] + federated_language.program.ReleaseManager[ + federated_language.program.ReleasableStructure, int + ] ], ]: del name # Unused. - release_manager = tff.program.LoggingReleaseManager() + release_manager = federated_language.program.LoggingReleaseManager() return (evaluation_process, release_manager) return tff.learning.programs.EvaluationManager( diff --git a/examples/program/BUILD b/examples/program/BUILD index 7bd07db1c7..1a8c9cf478 100644 --- a/examples/program/BUILD +++ b/examples/program/BUILD @@ -16,7 +16,10 @@ py_library( py_library( name = "program_logic", srcs = ["program_logic.py"], - deps = ["//tensorflow_federated"], + deps = [ + "//tensorflow_federated", + "@federated_language//federated_language", + ], ) py_binary( @@ -26,6 +29,7 @@ py_binary( ":computations", ":program_logic", "//tensorflow_federated", + "@federated_language//federated_language", ], ) @@ -36,5 +40,6 @@ py_test( ":computations", ":program_logic", "//tensorflow_federated", + "@federated_language//federated_language", ], ) diff --git a/examples/program/program.py b/examples/program/program.py index ed4c0d3457..43beb0f329 100644 --- a/examples/program/program.py +++ b/examples/program/program.py @@ -52,6 +52,7 @@ from absl import app from absl import flags +import federated_language import tensorflow as tf import tensorflow_federated as tff @@ -97,9 +98,11 @@ def main(argv: Sequence[str]) -> None: # Configure the platform-agnostic components. # Create release managers with access to customer storage. - train_metrics_managers = [tff.program.LoggingReleaseManager()] - evaluation_metrics_managers = [tff.program.LoggingReleaseManager()] - model_output_manager = tff.program.LoggingReleaseManager() + train_metrics_managers = [federated_language.program.LoggingReleaseManager()] + evaluation_metrics_managers = [ + federated_language.program.LoggingReleaseManager() + ] + model_output_manager = federated_language.program.LoggingReleaseManager() if _OUTPUT_DIR.value is not None: summary_dir = os.path.join(_OUTPUT_DIR.value, 'summary') @@ -113,20 +116,24 @@ def main(argv: Sequence[str]) -> None: # Group the metrics release managers; program logic may accept a single # release manager to make the implementation of the program logic simpler and # easier to maintain, the program can use a - # `tff.program.GroupingReleaseManager` to release values to multiple - # destinations. + # `federated_language.program.GroupingReleaseManager` to release values to + # multiple destinations. # # Filter the metrics before they are released; the program can use a - # `tff.program.FilteringReleaseManager` to limit the values that are - # released by the program logic. If a formal privacy guarantee is not + # `federated_language.program.FilteringReleaseManager` to limit the values + # that are released by the program logic. If a formal privacy guarantee is not # required, it may be ok to release all the metrics. - train_metrics_manager = tff.program.FilteringReleaseManager( - tff.program.GroupingReleaseManager(train_metrics_managers), + train_metrics_manager = federated_language.program.FilteringReleaseManager( + federated_language.program.GroupingReleaseManager(train_metrics_managers), _filter_metrics, ) - evaluation_metrics_manager = tff.program.FilteringReleaseManager( - tff.program.GroupingReleaseManager(evaluation_metrics_managers), - _filter_metrics, + evaluation_metrics_manager = ( + federated_language.program.FilteringReleaseManager( + federated_language.program.GroupingReleaseManager( + evaluation_metrics_managers + ), + _filter_metrics, + ) ) # Create a program state manager with access to platform storage. diff --git a/examples/program/program_logic.py b/examples/program/program_logic.py index 7816c9c6b1..47cfc4417e 100644 --- a/examples/program/program_logic.py +++ b/examples/program/program_logic.py @@ -28,6 +28,7 @@ import typing from typing import NamedTuple, Optional +import federated_language import tensorflow_federated as tff @@ -39,9 +40,9 @@ def _check_expected_type_signatures( *, initialize: tff.Computation, train: tff.Computation, - train_data_source: tff.program.FederatedDataSource, + train_data_source: federated_language.program.FederatedDataSource, evaluation: tff.Computation, - evaluation_data_source: tff.program.FederatedDataSource, + evaluation_data_source: federated_language.program.FederatedDataSource, ) -> None: """Checks the computations and data sources for the expected type signatures. @@ -57,12 +58,12 @@ def _check_expected_type_signatures( Args: initialize: A `tff.Computation` to invoke before training. train: A `tff.Computation` to invoke during training. - train_data_source: A `tff.program.FederatedDataSource` which returns client - data used during training. + train_data_source: A `federated_language.program.FederatedDataSource` which + returns client data used during training. evaluation: A `tff.Computation` to invoke to evaluate the model produced after training. - evaluation_data_source: A `tff.program.FederatedDataSource` which returns - client data used during evaluation. + evaluation_data_source: A `federated_language.program.FederatedDataSource` + which returns client data used during evaluation. Raises: UnexpectedTypeSignatureError: If the computations or data sources have an @@ -328,36 +329,43 @@ class _ProgramState(NamedTuple): Attributes: state: The server state produced at `round_num`. round_num: The training round. - iterator: The training `tff.program.FederatedDataSourceIterator`. + iterator: The training + `federated_language.program.FederatedDataSourceIterator`. """ state: object round_num: int - iterator: tff.program.FederatedDataSourceIterator + iterator: federated_language.program.FederatedDataSourceIterator async def train_federated_model( *, initialize: tff.Computation, train: tff.Computation, - train_data_source: tff.program.FederatedDataSource, + train_data_source: federated_language.program.FederatedDataSource, evaluation: tff.Computation, - evaluation_data_source: tff.program.FederatedDataSource, + evaluation_data_source: federated_language.program.FederatedDataSource, total_rounds: int, num_clients: int, train_metrics_manager: Optional[ - tff.program.ReleaseManager[tff.program.ReleasableStructure, int] + federated_language.program.ReleaseManager[ + federated_language.program.ReleasableStructure, int + ] ] = None, evaluation_metrics_manager: Optional[ - tff.program.ReleaseManager[tff.program.ReleasableStructure, int] + federated_language.program.ReleaseManager[ + federated_language.program.ReleasableStructure, int + ] ] = None, model_output_manager: Optional[ - tff.program.ReleaseManager[ - tff.program.ReleasableStructure, Optional[object] + federated_language.program.ReleaseManager[ + federated_language.program.ReleasableStructure, Optional[object] ] ] = None, program_state_manager: Optional[ - tff.program.ProgramStateManager[tff.program.ProgramStateStructure] + federated_language.program.ProgramStateManager[ + federated_language.program.ProgramStateStructure + ] ] = None, ) -> None: """Trains a federated model for some number of rounds. @@ -403,25 +411,29 @@ async def train_federated_model( Args: initialize: A `tff.Computation` to invoke before training. train: A `tff.Computation` to invoke during training. - train_data_source: A `tff.program.FederatedDataSource` which returns client - data used during training. + train_data_source: A `federated_language.program.FederatedDataSource` which + returns client data used during training. evaluation: A `tff.Computation` to invoke to evaluate the model produced after training. - evaluation_data_source: A `tff.program.FederatedDataSource` which returns - client data used during evaluation. + evaluation_data_source: A `federated_language.program.FederatedDataSource` + which returns client data used during evaluation. total_rounds: The number of training rounds to run. num_clients: The number of clients for each round of training and for evaluation. - train_metrics_manager: An optional `tff.program.ReleaseManager` used to - release training metrics. - evaluation_metrics_manager: An optional `tff.program.ReleaseManager` used to - release evaluation metrics. - model_output_manager: An optional `tff.program.ReleaseManager` used to - release training output. - program_state_manager: An optional `tff.program.ProgramStateManager` used to - save program state for fault tolerance. + train_metrics_manager: An optional + `federated_language.program.ReleaseManager` used to release training + metrics. + evaluation_metrics_manager: An optional + `federated_language.program.ReleaseManager` used to release evaluation + metrics. + model_output_manager: An optional + `federated_language.program.ReleaseManager` used to release training + output. + program_state_manager: An optional + `federated_language.program.ProgramStateManager` used to save program + state for fault tolerance. """ - tff.program.check_in_federated_context() + federated_language.program.check_in_federated_context() _check_expected_type_signatures( initialize=initialize, train=train, @@ -432,13 +444,14 @@ async def train_federated_model( # Cast the `program_state_manager` to a more specific type: a manager that # loads and saves `_ProgramState`s instead of a manager that loads and saves - # `tff.program.ProgramStateStructure`s. This allows the program logic to: + # `federated_language.program.ProgramStateStructure`s. This allows the program + # logic to: # * Keep `_ProgramState` private. # * Have static typing within the program logic. # * Require callers to provide a `program_state_manager` capable of handling - # any `tff.program.ProgramStateStructure`. + # any `federated_language.program.ProgramStateStructure`. program_state_manager = typing.cast( - Optional[tff.program.ProgramStateManager[_ProgramState]], + Optional[federated_language.program.ProgramStateManager[_ProgramState]], program_state_manager, ) @@ -451,7 +464,7 @@ async def train_federated_model( # previous run, this program state can be used to restore the execution of # this program logic and skip unnecessary steps. if program_state_manager is not None: - state = await tff.program.materialize_value(state) + state = await federated_language.program.materialize_value(state) structure = _ProgramState( state=state, round_num=0, diff --git a/examples/program/program_logic_test.py b/examples/program/program_logic_test.py index 2cffbb3fd8..84b488bdbc 100644 --- a/examples/program/program_logic_test.py +++ b/examples/program/program_logic_test.py @@ -20,6 +20,7 @@ from absl.testing import absltest from absl.testing import parameterized +import federated_language import numpy as np import tensorflow as tf import tensorflow_federated as tff @@ -45,7 +46,9 @@ def _create_mock_data_source_iterator( data: Optional[object] = None, ) -> mock.Mock: mock_data_source_iterator = mock.create_autospec( - tff.program.FederatedDataSourceIterator, spec_set=True, instance=True + federated_language.program.FederatedDataSourceIterator, + spec_set=True, + instance=True, ) type(mock_data_source_iterator).federated_type = mock.PropertyMock( spec=tff.FederatedType, return_value=federated_type, spec_set=True @@ -57,13 +60,17 @@ def _create_mock_data_source_iterator( def _create_mock_data_source( *, federated_type: tff.FederatedType, - iterator: Optional[tff.program.FederatedDataSourceIterator] = None, + iterator: Optional[ + federated_language.program.FederatedDataSourceIterator + ] = None, ) -> mock.Mock: if iterator is None: iterator = _create_mock_data_source_iterator(federated_type=federated_type) mock_data_source = mock.create_autospec( - tff.program.FederatedDataSource, spec_set=True, instance=True + federated_language.program.FederatedDataSource, + spec_set=True, + instance=True, ) type(mock_data_source).federated_type = mock.PropertyMock( spec=tff.Type, return_value=federated_type, spec_set=True @@ -131,7 +138,9 @@ def _create_mock_program_state_manager( version: int = 0, ) -> mock.Mock: mock_program_state_manager = mock.create_autospec( - tff.program.ProgramStateManager, spec_set=True, instance=True + federated_language.program.ProgramStateManager, + spec_set=True, + instance=True, ) mock_program_state_manager.load_latest.return_value = (program_state, version) return mock_program_state_manager @@ -309,13 +318,13 @@ async def test_calls_program_components( iterator=mock_evaluation_data_source_iterator, ) mock_train_metrics_manager = mock.create_autospec( - tff.program.ReleaseManager, spec_set=True, instance=True + federated_language.program.ReleaseManager, spec_set=True, instance=True ) mock_evaluation_metrics_manager = mock.create_autospec( - tff.program.ReleaseManager, spec_set=True, instance=True + federated_language.program.ReleaseManager, spec_set=True, instance=True ) mock_model_output_manager = mock.create_autospec( - tff.program.ReleaseManager, spec_set=True, instance=True + federated_language.program.ReleaseManager, spec_set=True, instance=True ) if mock_program_state_manager_factory is not None: if round_num != 0: @@ -458,13 +467,13 @@ async def test_fault_tolerance(self): evaluation_data_source = tff.program.DatasetDataSource(datasets) num_clients = 3 mock_train_metrics_manager = mock.create_autospec( - tff.program.ReleaseManager, spec_set=True, instance=True + federated_language.program.ReleaseManager, spec_set=True, instance=True ) mock_evaluation_metrics_manager = mock.create_autospec( - tff.program.ReleaseManager, spec_set=True, instance=True + federated_language.program.ReleaseManager, spec_set=True, instance=True ) mock_model_output_manager = mock.create_autospec( - tff.program.ReleaseManager, spec_set=True, instance=True + federated_language.program.ReleaseManager, spec_set=True, instance=True ) program_state_dir = self.create_tempdir() program_state_manager = tff.program.FileProgramStateManager( diff --git a/tensorflow_federated/python/learning/programs/evaluation_program_logic.py b/tensorflow_federated/python/learning/programs/evaluation_program_logic.py index 51904cf43e..2452939fa9 100644 --- a/tensorflow_federated/python/learning/programs/evaluation_program_logic.py +++ b/tensorflow_federated/python/learning/programs/evaluation_program_logic.py @@ -140,7 +140,8 @@ async def save( """Saves `program_state` and automatically advances the version number. Args: - program_state: A `tff.program.ProgramStateStructure` to save. + program_state: A `federated_language.program.ProgramStateStructure` to + save. """ async with self._lock: await self._state_manager.save(program_state, version=self._next_version) @@ -161,7 +162,9 @@ class EvaluationManager: 3. If the program has restarted, load the most recent state of in-progress evaluations and restart each of the evaluations. - This class uses N + 1 `tff.program.ProgramStateManagers` to enable resumable + This class uses N + 1 `federated_language.program.ProgramStateManagers` to + enable + resumable evaluations. * The first state managers is for this class itself, and manages the list of @@ -204,18 +207,19 @@ def __init__( """Creates an EvaluationManager. Args: - data_source: A `tff.program.FederatedDataSource` that the manager will use - to create iterators for evaluation loops. - aggregated_metrics_manager: A `tff.program.ReleaseManager` for releasing - the total aggregated metrics at the end of the evaluation loop. + data_source: A `federated_language.program.FederatedDataSource` that the + manager will use to create iterators for evaluation loops. + aggregated_metrics_manager: A `federated_language.program.ReleaseManager` + for releasing the total aggregated metrics at the end of the evaluation + loop. create_state_manager_fn: A callable that returns a `tff.program.FileProgramStateManager` that will be used to create the overall evaluation manager's state manager, and each per evaluation loop state manager that will enable resuming and checkpointing. create_process_fn: A callable that returns a 2-tuple of `tff.learning.templates.LearningProcess` and - `tff.program.ReleaseManager` for the per-evaluation round metrics - releasing that will used be to start each evaluation loop. + `federated_language.program.ReleaseManager` for the per-evaluation round + metrics releasing that will used be to start each evaluation loop. cohort_size: An integer denoting the size of each evaluation round to select from the iterator created from `data_source`. duration: The `datetime.timedelta` duration to run each evaluation loop. @@ -587,22 +591,22 @@ async def _run_evaluation( evaluate the model produced after training. This process must have been created using `tff.learning.algorithms.build_fed_eval`. evaluation_name: A str name of the evaluation computation. - evaluation_data_source: A `tff.program.FederatedDataSource` which returns - client data used during evaluation. + evaluation_data_source: A `federated_language.program.FederatedDataSource` + which returns client data used during evaluation. evaluation_per_round_clients_number: Number of clients to evaluate in each round. evaluation_end_time: Expected end time for running the evaluation. Multiple evaluation rounds will be run until the `evaluation_end_time` has reached. If the `evaluation_end_time` has passed, only one round will be run. - per_round_metrics_manager: A `tff.program.ReleaseManager` that releases the - per-round evaluation metrics from platform to user storage. Use a + per_round_metrics_manager: A `federated_language.program.ReleaseManager` + that releases the per-round evaluation metrics from platform to user + storage. Use a `tff.programs.GroupingReleaseManager` to utilize multiple + release managers. If `None`, per-round metrics are not released. + aggregated_metrics_manager: A `federated_language.program.ReleaseManager` + that releases the evaluation metrics aggregated across the entire + evaluation loop from platform to user storage. Use a `tff.programs.GroupingReleaseManager` to utilize multiple release - managers. If `None`, per-round metrics are not released. - aggregated_metrics_manager: A `tff.program.ReleaseManager` that releases the - evaluation metrics aggregated across the entire evaluation loop from - platform to user storage. Use a `tff.programs.GroupingReleaseManager` to - utilize multiple release managers. If `None`, aggregated evaluation - metrics are not released. + managers. If `None`, aggregated evaluation metrics are not released. Raises: TypeError: If result of `evaluation_process` is not a value of diff --git a/tensorflow_federated/python/learning/programs/training_program_logic.py b/tensorflow_federated/python/learning/programs/training_program_logic.py index d2a44b4d74..467876327b 100644 --- a/tensorflow_federated/python/learning/programs/training_program_logic.py +++ b/tensorflow_federated/python/learning/programs/training_program_logic.py @@ -48,8 +48,8 @@ class ProgramState(NamedTuple): round_number: The current round number. next_evaluation_timestamp_seconds: The timestamp of the next evaluation in seconds. - data_iterator: The `tff.program.FederatedDataSourceIterator` used for - training. + data_iterator: The `federated_language.program.FederatedDataSourceIterator` + used for training. """ state: composers.LearningAlgorithmState @@ -164,8 +164,8 @@ async def train_model( the train process. Its type signature should match the `type_signature` of the result of `train_process.initialize`. If not specified, use the retsult of `train_process.initialize`. - train_data_source: A `tff.program.FederatedDataSource` which returns client - data used during training. + train_data_source: A `federated_language.program.FederatedDataSource` which + returns client data used during training. train_per_round_clients: The number of clients per round of training. train_total_rounds: Total number of rounds of training. should_retry_round: A Callable that takes the @@ -173,13 +173,14 @@ async def train_model( `training_process.next` and returns whether the round should be retried. If a round should be retried, the program will roll back to the state of the previous round and retry this round. - program_state_manager: A `tff.program.ProgramStateManager` used to save - program state for fault tolerance. - model_output_manager: A `tff.program.ReleaseManager` to release the model, - the results can be used for building inference models after training, or - warm-starting future training loops. - train_metrics_manager: A `tff.program.ReleaseManager` to release metrics of - training. Use `tff.program.GroupingReleaseManager` to supply multiple + program_state_manager: A `federated_language.program.ProgramStateManager` + used to save program state for fault tolerance. + model_output_manager: A `federated_language.program.ReleaseManager` to + release the model, the results can be used for building inference models + after training, or warm-starting future training loops. + train_metrics_manager: A `federated_language.program.ReleaseManager` to + release metrics of training. Use + `federated_language.program.GroupingReleaseManager` to supply multiple release managers. evaluation_manager: An `EvaluationManager` used to create a state manager for each evaluation loop that is forked off from the training loop. diff --git a/tensorflow_federated/python/learning/programs/vizier_program_logic.py b/tensorflow_federated/python/learning/programs/vizier_program_logic.py index 0747bea9fd..14933de961 100644 --- a/tensorflow_federated/python/learning/programs/vizier_program_logic.py +++ b/tensorflow_federated/python/learning/programs/vizier_program_logic.py @@ -174,17 +174,17 @@ async def train_model_with_vizier( evaluating the model. train_process_factory: A factory for creating `tff.learning.templates.LearningProcess` to run for training. - train_data_source: A `tff.program.FederatedDataSource` which returns client - data used during training. + train_data_source: A `federated_language.program.FederatedDataSource` which + returns client data used during training. total_rounds: The number of rounds of training. num_clients: The number of clients per round of training. program_state_manager_factory: A factory for creating - `tff.program.ProgramStateManager`s for each trail. + `federated_language.program.ProgramStateManager`s for each trail. model_output_manager_factory: A factory for creating - `tff.program.ReleaseManager`s used to release the model. + `federated_language.program.ReleaseManager`s used to release the model. train_metrics_manager_factory: A factory for creating - `tff.program.ReleaseManager`s used to release training metrics for each - trail. + `federated_language.program.ReleaseManager`s used to release training + metrics for each trail. evaluation_manager_factory: A factory for creating `tff.learning.programs.EvaluationManager`s for each trail. evaluation_periodicity: Either a integer number of rounds or diff --git a/tensorflow_federated/python/program/BUILD b/tensorflow_federated/python/program/BUILD index 150eb07db5..a77dc5d57d 100644 --- a/tensorflow_federated/python/program/BUILD +++ b/tensorflow_federated/python/program/BUILD @@ -28,7 +28,6 @@ py_library( ":file_release_manager", ":native_platform", ":tensorboard_release_manager", - "@federated_language//federated_language", ], ) diff --git a/tensorflow_federated/python/program/__init__.py b/tensorflow_federated/python/program/__init__.py index ebe33235a5..8eb24ecefd 100644 --- a/tensorflow_federated/python/program/__init__.py +++ b/tensorflow_federated/python/program/__init__.py @@ -13,61 +13,17 @@ # limitations under the License. """Libraries for creating federated programs.""" -import federated_language # pylint: disable=g-importing-member from tensorflow_federated.python.program.client_id_data_source import ClientIdDataSource from tensorflow_federated.python.program.client_id_data_source import ClientIdDataSourceIterator - -FederatedDataSource = federated_language.program.FederatedDataSource -FederatedDataSourceIterator = ( - federated_language.program.FederatedDataSourceIterator -) from tensorflow_federated.python.program.dataset_data_source import DatasetDataSource from tensorflow_federated.python.program.dataset_data_source import DatasetDataSourceIterator - -check_in_federated_context = ( - federated_language.program.check_in_federated_context -) -ComputationArg = federated_language.program.ComputationArg -contains_only_server_placed_data = ( - federated_language.program.contains_only_server_placed_data -) -FederatedContext = federated_language.program.FederatedContext from tensorflow_federated.python.program.file_program_state_manager import FileProgramStateManager from tensorflow_federated.python.program.file_release_manager import CSVFileReleaseManager from tensorflow_federated.python.program.file_release_manager import CSVKeyFieldnameNotFoundError from tensorflow_federated.python.program.file_release_manager import CSVSaveMode from tensorflow_federated.python.program.file_release_manager import SavedModelFileReleaseManager - -LoggingReleaseManager = federated_language.program.LoggingReleaseManager -MemoryReleaseManager = federated_language.program.MemoryReleaseManager from tensorflow_federated.python.program.native_platform import NativeFederatedContext from tensorflow_federated.python.program.native_platform import NativeValueReference - -ProgramStateExistsError = federated_language.program.ProgramStateExistsError -ProgramStateManager = federated_language.program.ProgramStateManager -ProgramStateNotFoundError = federated_language.program.ProgramStateNotFoundError -ProgramStateStructure = federated_language.program.ProgramStateStructure -ProgramStateValue = federated_language.program.ProgramStateValue -DelayedReleaseManager = federated_language.program.DelayedReleaseManager -FilteringReleaseManager = federated_language.program.FilteringReleaseManager -GroupingReleaseManager = federated_language.program.GroupingReleaseManager -NotFilterableError = federated_language.program.NotFilterableError -PeriodicReleaseManager = federated_language.program.PeriodicReleaseManager -ReleasableStructure = federated_language.program.ReleasableStructure -ReleasableValue = federated_language.program.ReleasableValue -ReleaseManager = federated_language.program.ReleaseManager from tensorflow_federated.python.program.tensorboard_release_manager import TensorBoardReleaseManager - -MaterializableStructure = federated_language.program.MaterializableStructure -MaterializableTypeSignature = ( - federated_language.program.MaterializableTypeSignature -) -MaterializableValue = federated_language.program.MaterializableValue -MaterializableValueReference = ( - federated_language.program.MaterializableValueReference -) -materialize_value = federated_language.program.materialize_value -MaterializedStructure = federated_language.program.MaterializedStructure -MaterializedValue = federated_language.program.MaterializedValue # pylint: enable=g-importing-member diff --git a/tensorflow_federated/python/program/client_id_data_source.py b/tensorflow_federated/python/program/client_id_data_source.py index c3de371569..9bbaf94765 100644 --- a/tensorflow_federated/python/program/client_id_data_source.py +++ b/tensorflow_federated/python/program/client_id_data_source.py @@ -26,9 +26,11 @@ class ClientIdDataSourceIterator( federated_language.program.FederatedDataSourceIterator ): - """A `tff.program.FederatedDataSourceIterator` backed by client ids. + """A `federated_language.program.FederatedDataSourceIterator` backed by client ids. - A `tff.program.FederatedDataSourceIterator` backed by sequence of client ids, + A `federated_language.program.FederatedDataSourceIterator` backed by sequence + of + client ids, one client id per client. It selects client ids uniformly at random, with replacement over successive calls of `select()` but without replacement within a single call of `select()`. @@ -101,7 +103,7 @@ def __eq__(self, other: object) -> bool: class ClientIdDataSource(federated_language.program.FederatedDataSource): - """A `tff.program.FederatedDataSource` backed by client ids.""" + """A `federated_language.program.FederatedDataSource` backed by client ids.""" def __init__(self, client_ids: Sequence[str]): """Returns an initialized `tff.program.ClientIdDataSource`. diff --git a/tensorflow_federated/python/program/dataset_data_source.py b/tensorflow_federated/python/program/dataset_data_source.py index 91da5e818f..6a2e3d7d60 100644 --- a/tensorflow_federated/python/program/dataset_data_source.py +++ b/tensorflow_federated/python/program/dataset_data_source.py @@ -27,9 +27,10 @@ class DatasetDataSourceIterator( federated_language.program.FederatedDataSourceIterator ): - """A `tff.program.FederatedDataSourceIterator` backed by `tf.data.Dataset`s. + """A `federated_language.program.FederatedDataSourceIterator` backed by `tf.data.Dataset`s. - A `tff.program.FederatedDataSourceIterator` backed by a sequence of + A `federated_language.program.FederatedDataSourceIterator` backed by a + sequence of `tf.data.Dataset's, one `tf.data.Dataset' per client. It selects datasources uniformly at random, with replacement over successive calls of `select()` but without replacement within a single call of `select()`. @@ -135,9 +136,9 @@ def __eq__(self, other: object) -> bool: class DatasetDataSource(federated_language.program.FederatedDataSource): - """A `tff.program.FederatedDataSource` backed by `tf.data.Dataset`s. + """A `federated_language.program.FederatedDataSource` backed by `tf.data.Dataset`s. - A `tff.program.FederatedDataSource` backed by a sequence of + A `federated_language.program.FederatedDataSource` backed by a sequence of `tf.data.Dataset's, one `tf.data.Dataset' per client, and selects data uniformly random with replacement. """ diff --git a/tensorflow_federated/python/program/file_program_state_manager.py b/tensorflow_federated/python/program/file_program_state_manager.py index e65930bd46..7efe45fbb5 100644 --- a/tensorflow_federated/python/program/file_program_state_manager.py +++ b/tensorflow_federated/python/program/file_program_state_manager.py @@ -38,7 +38,7 @@ class FileProgramStateManager( federated_language.program.ProgramStateStructure ] ): - """A `tff.program.ProgramStateManager` that is backed by a file system. + """A `federated_language.program.ProgramStateManager` that is backed by a file system. A `tff.program.FileProgramStateManager` is a utility for saving and loading program state to a file system in a federated program and is used to implement @@ -47,7 +47,8 @@ class FileProgramStateManager( Program state is saved to the file system using the SavedModel (see `tf.saved_model`) format. When the program state is saved, each - `tff.program.MaterializableValueReference` is materialized and each + `federated_language.program.MaterializableValueReference` is materialized and + each `tff.Serializable` is serialized. The structure of the program state is discarded, but is required to load the program state. @@ -67,7 +68,7 @@ def __init__( keep_first: bool = True, keep_every_k: int = 1, ): - """Returns an initialized `tff.program.ProgramStateManager`. + """Returns an initialized `federated_language.program.ProgramStateManager`. Args: root_dir: A path on the file system to save program state. If this path @@ -261,7 +262,8 @@ async def save( """Saves `program_state` for the given `version`. Args: - program_state: A `tff.program.ProgramStateStructure` to save. + program_state: A `federated_language.program.ProgramStateStructure` to + save. version: A strictly increasing integer representing the version of a saved `program_state`. diff --git a/tensorflow_federated/python/program/file_release_manager.py b/tensorflow_federated/python/program/file_release_manager.py index 727f396ad9..c1626e2796 100644 --- a/tensorflow_federated/python/program/file_release_manager.py +++ b/tensorflow_federated/python/program/file_release_manager.py @@ -65,7 +65,7 @@ class CSVFileReleaseManager( federated_language.program.ReleasableStructure, int ] ): - """A `tff.program.ReleaseManager` that releases values to a CSV file. + """A `federated_language.program.ReleaseManager` that releases values to a CSV file. A `tff.program.CSVFileReleaseManager` is a utility for releasing values from a federated program to a CSV file and is used to release values from @@ -73,7 +73,9 @@ class CSVFileReleaseManager( Values are released to the file system as a CSV file and are quoted as strings. When the value is released, each - `tff.program.MaterializableValueReference` is materialized. The value is then + `federated_language.program.MaterializableValueReference` is materialized. The + value + is then flattened, converted to a `numpy.ndarray`, and then converted to a nested list of Python scalars, and released as a CSV file. For example, `1` will be written as `'1'` and `tf.constant([[1, 1], [1, 1]])` will be written as @@ -261,7 +263,7 @@ async def release( writing `value`. Args: - value: A `tff.program.ReleasableStructure` to release. + value: A `federated_language.program.ReleasableStructure` to release. key: An integer used to reference the released `value`; `key` represents a step in a federated program. """ @@ -295,7 +297,7 @@ class SavedModelFileReleaseManager( federated_language.program.Key, ] ): - """A `tff.program.ReleaseManager` that releases values to a file system. + """A `federated_language.program.ReleaseManager` that releases values to a file system. A `tff.program.SavedModelFileReleaseManager` is a utility for releasing values from a federated program to a file system and is used to release values from @@ -303,7 +305,8 @@ class SavedModelFileReleaseManager( Values are released to the file system using the SavedModel (see `tf.saved_model`) format. When the value is released, each - `tff.program.MaterializableValueReference` is materialized. The structure of + `federated_language.program.MaterializableValueReference` is materialized. The + structure of the value is discarded. Note: The SavedModel format can only contain values that can be converted to a @@ -355,7 +358,7 @@ async def release( """Releases `value` from a federated program. Args: - value: A `tff.program.ReleasableStructure` to release. + value: A `federated_language.program.ReleasableStructure` to release. key: Used to reference (in the file system) the released `value`. """ path = self._get_path_for_key(key) diff --git a/tensorflow_federated/python/program/native_platform.py b/tensorflow_federated/python/program/native_platform.py index 1dc6195d3a..758a71b35d 100644 --- a/tensorflow_federated/python/program/native_platform.py +++ b/tensorflow_federated/python/program/native_platform.py @@ -27,7 +27,7 @@ class NativeValueReference( federated_language.program.MaterializableValueReference ): - """A `tff.program.MaterializableValueReference` backed by a task.""" + """A `federated_language.program.MaterializableValueReference` backed by a task.""" def __init__( self, @@ -136,7 +136,7 @@ async def _get_item( class NativeFederatedContext(federated_language.program.FederatedContext): - """A `tff.program.FederatedContext` backed by an execution context.""" + """A `federated_language.program.FederatedContext` backed by an execution context.""" def __init__( self, context: federated_language.framework.AsyncExecutionContext @@ -158,13 +158,13 @@ def invoke( Args: comp: The `tff.Computation` being invoked. arg: The optional argument of `comp`; server-placed values must be - represented by `tff.program.MaterializableStructure`, and client-placed - values must be represented by structures of values returned by a - `tff.program.FederatedDataSourceIterator`. + represented by `federated_language.program.MaterializableStructure`, and + client-placed values must be represented by structures of values + returned by a `federated_language.program.FederatedDataSourceIterator`. Returns: The result of invocation; a structure of - `tff.program.MaterializableValueReference`. + `federated_language.program.MaterializableValueReference`. Raises: ValueError: If the result type of the invoked computation does not contain diff --git a/tensorflow_federated/python/program/program_test_utils.py b/tensorflow_federated/python/program/program_test_utils.py index b0441043d5..94daca2ffe 100644 --- a/tensorflow_federated/python/program/program_test_utils.py +++ b/tensorflow_federated/python/program/program_test_utils.py @@ -36,7 +36,7 @@ class TestMaterializableValueReference( federated_language.program.MaterializableValueReference ): - """A test implementation of `tff.program.MaterializableValueReference`.""" + """A test implementation of `federated_language.program.MaterializableValueReference`.""" def __init__(self, value: federated_language.program.MaterializedValue): self._value = value diff --git a/tensorflow_federated/python/program/structure_utils.py b/tensorflow_federated/python/program/structure_utils.py index 5a620525a4..160869eb80 100644 --- a/tensorflow_federated/python/program/structure_utils.py +++ b/tensorflow_federated/python/program/structure_utils.py @@ -27,7 +27,8 @@ T = TypeVar('T') # This type defines the structures supported by the `tff.program` API, meaning # values of type `T` nested in structures defined by this type. For an example -# of how to use this type see `tff.program.MaterializedStructure`. +# of how to use this type see +# `federated_language.program.MaterializedStructure`. Structure = Union[ T, Sequence['Structure[T]'], diff --git a/tensorflow_federated/python/program/tensorboard_release_manager.py b/tensorflow_federated/python/program/tensorboard_release_manager.py index 4694d8fa23..67430c66cf 100644 --- a/tensorflow_federated/python/program/tensorboard_release_manager.py +++ b/tensorflow_federated/python/program/tensorboard_release_manager.py @@ -28,7 +28,7 @@ class TensorBoardReleaseManager( federated_language.program.ReleasableStructure, int ] ): - """A `tff.program.ReleaseManager` that releases values to TensorBoard. + """A `federated_language.program.ReleaseManager` that releases values to TensorBoard. A `tff.program.TensorBoardReleaseManager` is a utility for releasing values from a federated program to TensorBoard and is used to release values from @@ -73,7 +73,7 @@ async def release( """Releases `value` from a federated program. Args: - value: A `tff.program.ReleasableStructure` to release. + value: A `federated_language.program.ReleasableStructure` to release. key: A integer used to reference the released `value`; `key` represents a step in a federated program. """ diff --git a/tensorflow_federated/python/simulation/training_loop.py b/tensorflow_federated/python/simulation/training_loop.py index a80b61b80b..95df211aca 100644 --- a/tensorflow_federated/python/simulation/training_loop.py +++ b/tensorflow_federated/python/simulation/training_loop.py @@ -126,12 +126,13 @@ def run_training_process( round. rounds_per_evaluation: The number of training rounds to run between each invocation of `evaluation_fn`. - program_state_manager: An optional `tff.program.ProgramStateManager` to use - to save program state for fault tolerance. + program_state_manager: An optional + `federated_language.program.ProgramStateManager` to use to save program + state for fault tolerance. rounds_per_saving_program_state: The number of training rounds to run between saving program state. - metrics_managers: An optional list of `tff.program.ReleaseManagers`s to use - to save metrics. + metrics_managers: An optional list of + `federated_language.program.ReleaseManagers`s to use to save metrics. Returns: The `state` of the training process after training.