Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature]: 关于PIR对 unlabeled PSI功能支持的询问 #123

Open
winnylyc opened this issue May 9, 2024 · 6 comments
Open

[Feature]: 关于PIR对 unlabeled PSI功能支持的询问 #123

winnylyc opened this issue May 9, 2024 · 6 comments
Assignees
Labels
enhancement New feature or request

Comments

@winnylyc
Copy link

winnylyc commented May 9, 2024

您好,打扰您了。
我想询问PIR上两个功能上是否有相关的实现或者是否能降低开销。

  1. unlabeled PSI
    我关注到PIR功能所对应的APSI库是支持unlabeld PSI功能的,而且也提到其开销会比label PSI更小的,我想请问一下这个功能是否支持以及如何调用,我目前的尝试是直接将label_columns设为[],即没有label输入,但是似乎无法执行。
spu.pir_setup(
    server="alice",
    input_path="/root/project/psi1/alice_exactpsi_1e6.csv",
    key_columns=['name'],
    label_columns=[],
    oprf_key_path="/root/project/psi1/alice_oprf_key",
    setup_path="/root/project/psi1/alice_exactpsi_setup_npq20",
    num_per_query=20,
    label_max_len=80,
    bucket_size=1000000
)

在secretnote上执行的结果如下。

Bob's Output:
2024-05-09 14:55:07.545 WARNING api.py:607 [bob] -- [Anonymous_job] Encounter RemoteError happend in other parties, error message: FedRemoteError occurred at alice
RayTaskError(FedRemoteError): �[36mray::ReceiverProxyActor.get_data()�[39m (pid=201684, ip=192.168.15.7, actor_id=57475061dc167106e064d65801000000, repr=<fed.proxy.barriers.ReceiverProxyActor object at 0x7f08dc26df60>)
  File "/root/anaconda3/envs/psi/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/root/anaconda3/envs/psi/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/proxy/barriers.py", line 236, in get_data
    raise data
fed.exceptions.FedRemoteError: FedRemoteError occurred at alice
---------------------------------------------------------------------------
RayTaskError(FedRemoteError)              Traceback (most recent call last)
Cell In[3], line 1
----> 1 spu.pir_setup(
      2     server="alice",
      3     input_path="/root/project/psi1/alice_exactpsi_1e6.csv",
      4     key_columns=['name'],
      5     # label_columns=['country', 'location'],
      6     label_columns=[],
      7     oprf_key_path="/root/project/psi1/alice_oprf_key",
      8     setup_path="/root/project/psi1/alice_exactpsi_setup_npq20",
      9     num_per_query=20,
     10     label_max_len=80,
     11     bucket_size=1000000
     12 )

File ~/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py:1990, in SPU.pir_setup(self, server, input_path, key_columns, label_columns, oprf_key_path, setup_path, num_per_query, label_max_len, bucket_size)
   1961 def pir_setup(
   1962     self,
   1963     server: str,
   (...)
   1971     bucket_size: int,
   1972 ):
   1973     """Private information retrival offline setup.
   1974     Args:
   1975         server (str): Which party is pir server.
   (...)
   1988         Dict: PIR report output by SPU.
   1989     """
-> 1990     return dispatch(
   1991         'pir_setup',
   1992         self,
   1993         server,
   1994         input_path,
   1995         key_columns,
   1996         label_columns,
   1997         oprf_key_path,
   1998         setup_path,
   1999         num_per_query,
   2000         label_max_len,
   2001         bucket_size,
   2002     )

File ~/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py:111, in dispatch(name, self, *args, **kwargs)
    101 def dispatch(name: str, self, *args, **kwargs):
    102     """Dispatch device kernel.
    103 
    104     Args:
   (...)
    109         Kernel execution result.
    110     """
--> 111     return _registrar.dispatch(self.device_type, name, self, *args, **kwargs)

File ~/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py:80, in Registrar.dispatch(self, device_type, name, *args, **kwargs)
     78 if name not in self._ops[device_type]:
     79     raise KeyError(f'device: {device_type}, op: {name} not registered')
---> 80 return self._ops[device_type][name](*args, **kwargs)

File ~/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/kernels/spu.py:521, in pir_setup(device, server, input_path, key_columns, label_columns, oprf_key_path, setup_path, num_per_query, label_max_len, bucket_size)
    506 res.append(
    507     actor.pir_setup.remote(
    508         server,
   (...)
    517     )
    518 )
    520 # wait for all tasks done
--> 521 return sfd.get(res)

File ~/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/distributed/primitive.py:156, in get(object_refs)
    148 def get(
    149     object_refs: Union[
    150         Union[ray.ObjectRef, List[ray.ObjectRef]],
   (...)
    153     ]
    154 ):
    155     if get_distribution_mode() == DISTRIBUTION_MODE.PRODUCTION:
--> 156         return fed.get(object_refs)
    157     elif get_distribution_mode() == DISTRIBUTION_MODE.SIMULATION:
    158         return ray.get(object_refs)

File ~/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py:613, in get(fed_objects)
    611 if get_global_context() is not None:
    612     get_global_context().set_last_recevied_error(e)
--> 613 raise e

File ~/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py:602, in get(fed_objects)
    599         ray_refs.append(received_ray_object_ref)
    601 try:
--> 602     values = ray.get(ray_refs)
    603     if is_individual_id:
    604         values = values[0]

File ~/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/auto_init_hook.py:24, in wrap_auto_init.<locals>.auto_init_wrapper(*args, **kwargs)
     21 @wraps(fn)
     22 def auto_init_wrapper(*args, **kwargs):
     23     auto_init_ray()
---> 24     return fn(*args, **kwargs)

File ~/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/client_mode_hook.py:103, in client_mode_hook.<locals>.wrapper(*args, **kwargs)
    101     if func.__name__ != "init" or is_client_mode_enabled_by_default:
    102         return getattr(ray, func.__name__)(*args, **kwargs)
--> 103 return func(*args, **kwargs)

File ~/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/worker.py:2524, in get(object_refs, timeout)
   2522     worker.core_worker.dump_object_store_memory_usage()
   2523 if isinstance(value, RayTaskError):
-> 2524     raise value.as_instanceof_cause()
   2525 else:
   2526     raise value

RayTaskError(FedRemoteError): ray::ReceiverProxyActor.get_data() (pid=201684, ip=192.168.15.7, actor_id=57475061dc167106e064d65801000000, repr=<fed.proxy.barriers.ReceiverProxyActor object at 0x7f08dc26df60>)
  File "/root/anaconda3/envs/psi/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/root/anaconda3/envs/psi/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/proxy/barriers.py", line 236, in get_data
    raise data
fed.exceptions.FedRemoteError: FedRemoteError occurred at alice

Alice's Output:

KeyboardInterrupt
  1. 只返回单个结果
    我目前使用PIR功能时,获得的结果都是返回匹配的所有结果。如果我只想获得匹配的第一个结果,想请教一下是否开销会变小,以及是否有相应的支持。这个功能似乎是与unlabeled PSI部分重合的,我目前对这个功能的需求与unlabeled PSI的场景相同。
@winnylyc winnylyc added the enhancement New feature or request label May 9, 2024
@6fj 6fj assigned 6fj and qxzhou1010 May 9, 2024
@6fj
Copy link
Member

6fj commented May 9, 2024

感谢你这边的输入,请持续关注这个issue的状态

@winnylyc
Copy link
Author

我之前的输入应该信息不是很足😓,secretnote上的错误信息应该不是很准确,另外上次实验存在重复的key所以本质应该无法运行。
我这边重新生成了不存在重复key的数据,并直接使用python运行。下面尝试了将label_columns设为[],但是不能运行。想要请问一下是不是暂时还没有unlabeled PSI相关的实现?
下面提供了运行结果。
Sender(Server):

import secretflow as sf
import spu
import time

cluster_config = {
    'parties' : {
        'alice': {
            'address': '127.0.0.1:59179',
            'listen_addr': '0.0.0.0:59179'
        },
        'bob': {
            'address': '127.0.0.1:53341',
            'listen_addr': '0.0.0.0:53341'
        }
    },
    'self_party': 'alice'
}
sf.shutdown
sf.init(address='local', cluster_config=cluster_config)
cluster_def = {
    "nodes": [
        {
            "party": "alice",
            "address": "127.0.0.1:45413"
        },
        {
            "party": "bob",
            "address": "127.0.0.1:47480"
        },
    ],
    "runtime_config": {
        "protocol": spu.spu_pb2.SEMI2K,
        "field": spu.spu_pb2.FM128
    },
}

spu = sf.SPU(
    cluster_def,
    link_desc={
        "connect_retry_times": 60,
        "connect_retry_interval_ms": 1000,
    }
)

npq = 1
spu.pir_setup(
    server="alice",
    input_path="/root/project/psi1/alice_exactpsi_1e6_unique.csv",
    key_columns=['name'],
    label_columns=[],
    oprf_key_path="/root/project/psi1/alice_oprf_key",
    setup_path=f"/root/project/psi1/alice_exactpsi_setup_1e6_len55_unique_npq{npq}",
    num_per_query=npq,
    label_max_len=55,
    bucket_size=1000000
)

输出:

/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
2024-05-23 18:43:48,666 INFO worker.py:1621 -- Started a local Ray instance.
2024-05-23 18:43:49.412 INFO api.py:233 [alice] -- [Anonymous_job] Started rayfed with {'CLUSTER_ADDRESSES': {'alice': '0.0.0.0:59179', 'bob': '127.0.0.1:53341'}, 'CURRENT_PARTY_NAME': 'alice', 'TLS_CONFIG': {}}
2024-05-23 18:43:50.036 INFO barriers.py:284 [alice] -- [Anonymous_job] Succeeded to create receiver proxy actor.
(ReceiverProxyActor pid=2406) 2024-05-23 18:43:50.032 INFO grpc_proxy.py:359 [alice] -- [Anonymous_job] ReceiverProxy binding port 59179, options: (('grpc.enable_retries', 1), ('grpc.so_reuseport', 0), ('grpc.max_send_message_length', 524288000), ('grpc.max_receive_message_length', 524288000), ('grpc.service_config', '{"methodConfig": [{"name": [{"service": "GrpcService"}], "retryPolicy": {"maxAttempts": 5, "initialBackoff": "5s", "maxBackoff": "30s", "backoffMultiplier": 2, "retryableStatusCodes": ["UNAVAILABLE"]}}]}'))...
(ReceiverProxyActor pid=2406) 2024-05-23 18:43:50.035 INFO grpc_proxy.py:379 [alice] -- [Anonymous_job] Successfully start Grpc service without credentials.
2024-05-23 18:43:50.674 INFO barriers.py:333 [alice] -- [Anonymous_job] SenderProxyActor has successfully created.
2024-05-23 18:43:50.674 INFO barriers.py:520 [alice] -- [Anonymous_job] Try ping ['bob'] at 0 attemp, up to 3600 attemps.
2024-05-23 18:43:53.678 INFO barriers.py:520 [alice] -- [Anonymous_job] Try ping ['bob'] at 1 attemp, up to 3600 attemps.
(SPURuntime(device_id=None, party=alice) pid=3107) WARNING:root:config=mode: MODE_SERVER_SETUP
(SPURuntime(device_id=None, party=alice) pid=3107) pir_protocol: PIR_PROTOCOL_KEYWORD_PIR_APSI
(SPURuntime(device_id=None, party=alice) pid=3107) pir_server_config {
(SPURuntime(device_id=None, party=alice) pid=3107)   input_path: "/root/project/psi1/alice_exactpsi_1e6_unique.csv"
(SPURuntime(device_id=None, party=alice) pid=3107)   setup_path: "/root/project/psi1/alice_exactpsi_setup_1e6_len55_unique_npq1"
(SPURuntime(device_id=None, party=alice) pid=3107)   key_columns: "name"
(SPURuntime(device_id=None, party=alice) pid=3107)   label_max_len: 55
(SPURuntime(device_id=None, party=alice) pid=3107)   bucket_size: 1000000
(SPURuntime(device_id=None, party=alice) pid=3107)   apsi_server_config {
(SPURuntime(device_id=None, party=alice) pid=3107)     oprf_key_path: "/root/project/psi1/alice_oprf_key"
(SPURuntime(device_id=None, party=alice) pid=3107)     num_per_query: 1
(SPURuntime(device_id=None, party=alice) pid=3107)   }
(SPURuntime(device_id=None, party=alice) pid=3107) }
(SPURuntime(device_id=None, party=alice) pid=3107)
(SPURuntime(device_id=None, party=alice) pid=3107) [2024-05-23 18:43:58.445] [info] [pir.cc:245] table_params hash_func_count:1
(SPURuntime(device_id=None, party=alice) pid=3107) [2024-05-23 18:43:58.445] [info] [pir.cc:247] table_params max_items_per_bin:55
(SPURuntime(device_id=None, party=alice) pid=3107) [2024-05-23 18:43:58.445] [info] [pir.cc:250] seal_params poly_modulus_degree:2048
(SPURuntime(device_id=None, party=alice) pid=3107) [2024-05-23 18:43:58.445] [info] [pir.cc:252] query_params query_powers size:55
(SPURuntime(device_id=None, party=alice) pid=3107) [2024-05-23 18:43:58.455] [info] [pir.cc:267] bucket:0 bucket_setup_path:/root/project/psi1/alice_exactpsi_setup_1e6_len55_unique_npq1/bucket_0
2024-05-23 18:43:58.476 WARNING cleanup.py:154 [alice] -- [Anonymous_job] Failed to send ObjectRef(80e22aed7718a125377d0994e1b53538ecef50370100000001000000) to bob, error: ray::SenderProxyActor.send() (pid=2464, ip=192.168.15.7, actor_id=377d0994e1b53538ecef503701000000, repr=<fed.proxy.barriers.SenderProxyActor object at 0x7f81b0409f00>)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::SPURuntime.pir_setup() (pid=3107, ip=192.168.15.7, actor_id=66e4abb0bac5fbda63689d7f01000000, repr=SPURuntime(device_id=None, party=alice))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 1245, in pir_setup
    report = psi.pir(config)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 148, in pir
    report_str = libpsi.libs.pir(config.SerializeToString(), link)
RuntimeError: what:
        [external/psi/psi/utils/batch_provider.cc:160] unsupported.
stacktrace:
#0 psi::apsi::PirServerSetup()+0x7f6d8952b828
#1 psi::apsi::Launch()+0x7f6d895332f9
#2 psi::RunPir()+0x7f6d8943c003
#3 psi::BindLibs()::{lambda()#5}::operator()()+0x7f6d894365d5
#4 pybind11::cpp_function::initialize<>()::{lambda()#3}::_FUN()+0x7f6d894367e0
#5 pybind11::cpp_function::dispatcher()+0x7f6d89417fed
#6 cfunction_call+0x4fd907,upstream_seq_id: 5#0, downstream_seq_id: 6.
2024-05-23 18:43:58.476 INFO cleanup.py:161 [alice] -- [Anonymous_job] Sending error what:
        [external/psi/psi/utils/batch_provider.cc:160] unsupported.
stacktrace:
#0 psi::apsi::PirServerSetup()+0x7f6d8952b828
#1 psi::apsi::Launch()+0x7f6d895332f9
#2 psi::RunPir()+0x7f6d8943c003
#3 psi::BindLibs()::{lambda()#5}::operator()()+0x7f6d894365d5
#4 pybind11::cpp_function::initialize<>()::{lambda()#3}::_FUN()+0x7f6d894367e0
#5 pybind11::cpp_function::dispatcher()+0x7f6d89417fed
#6 cfunction_call+0x4fd907

 to bob.
2024-05-23 18:43:58.477 WARNING cleanup.py:127 [alice] -- [Anonymous_job] Signal SIGINT to exit.
2024-05-23 18:43:58.477 WARNING api.py:60 [alice] -- [Anonymous_job] Stop signal received (e.g. via SIGINT/Ctrl+C), try to shutdown fed. Press CTRL+C (or send SIGINT/SIGKILL/SIGTERM) to skip.
2024-05-23 18:43:58.477 WARNING api.py:325 [alice] -- [Anonymous_job] Shutdowning rayfed unintendedly...
2024-05-23 18:43:58.477 ERROR api.py:330 [alice] -- [Anonymous_job] Cross-silo sending error occured. ray::SenderProxyActor.send() (pid=2464, ip=192.168.15.7, actor_id=377d0994e1b53538ecef503701000000, repr=<fed.proxy.barriers.SenderProxyActor object at 0x7f81b0409f00>)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::SPURuntime.pir_setup() (pid=3107, ip=192.168.15.7, actor_id=66e4abb0bac5fbda63689d7f01000000, repr=SPURuntime(device_id=None, party=alice))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 1245, in pir_setup
    report = psi.pir(config)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 148, in pir
    report_str = libpsi.libs.pir(config.SerializeToString(), link)
RuntimeError: what:
        [external/psi/psi/utils/batch_provider.cc:160] unsupported.
stacktrace:
#0 psi::apsi::PirServerSetup()+0x7f6d8952b828
#1 psi::apsi::Launch()+0x7f6d895332f9
#2 psi::RunPir()+0x7f6d8943c003
#3 psi::BindLibs()::{lambda()#5}::operator()()+0x7f6d894365d5
#4 pybind11::cpp_function::initialize<>()::{lambda()#3}::_FUN()+0x7f6d894367e0
#5 pybind11::cpp_function::dispatcher()+0x7f6d89417fed
#6 cfunction_call+0x4fd907
2024-05-23 18:43:58.477 INFO api.py:337 [alice] -- [Anonymous_job] No wait for data sending.
2024-05-23 18:43:58.478 INFO message_queue.py:70 [alice] -- [Anonymous_job] Notify message polling thread[ErrorSendingQueueThread] to exit.
2024-05-23 18:43:58.478 INFO api.py:352 [alice] -- [Anonymous_job] Shutdowned rayfed.
2024-05-23 18:43:58.479 CRITICAL api.py:356 [alice] -- [Anonymous_job] Exit now due to the previous error.

Receiver(Client):

import secretflow as sf
import spu
import time

cluster_config = {
    'parties' : {
        'alice': {
            'address': '127.0.0.1:59179',
            'listen_addr': '0.0.0.0:59179'
        },
        'bob': {
            'address': '127.0.0.1:53341',
            'listen_addr': '0.0.0.0:53341'
        }
    },
    'self_party': 'bob'
}
sf.shutdown
sf.init(address='local', cluster_config=cluster_config)
cluster_def = {
    "nodes": [
        {
            "party": "alice",
            "address": "127.0.0.1:45413"
        },
        {
            "party": "bob",
            "address": "127.0.0.1:47480"
        },
    ],
    "runtime_config": {
        "protocol": spu.spu_pb2.SEMI2K,
        "field": spu.spu_pb2.FM128
    },
}

spu = sf.SPU(
    cluster_def,
    link_desc={
        "connect_retry_times": 60,
        "connect_retry_interval_ms": 1000,
    }
)

npq = 1
spu.pir_setup(
    server="alice",
    input_path="/root/project/psi1/alice_exactpsi_1e6_unique.csv",
    key_columns=['name'],
    label_columns=[],
    oprf_key_path="/root/project/psi1/alice_oprf_key",
    setup_path=f"/root/project/psi1/alice_exactpsi_setup_1e6_len55_unique_npq{npq}",
    num_per_query=npq,
    label_max_len=55,
    bucket_size=1000000
)

输出:

/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
2024-05-23 18:43:51,091 INFO worker.py:1621 -- Started a local Ray instance.
2024-05-23 18:43:51.803 INFO api.py:233 [bob] -- [Anonymous_job] Started rayfed with {'CLUSTER_ADDRESSES': {'alice': '127.0.0.1:59179', 'bob': '0.0.0.0:53341'}, 'CURRENT_PARTY_NAME': 'bob', 'TLS_CONFIG': {}}
2024-05-23 18:43:52.553 INFO barriers.py:284 [bob] -- [Anonymous_job] Succeeded to create receiver proxy actor.
(ReceiverProxyActor pid=2981) 2024-05-23 18:43:52.548 INFO grpc_proxy.py:359 [bob] -- [Anonymous_job] ReceiverProxy binding port 53341, options: (('grpc.enable_retries', 1), ('grpc.so_reuseport', 0), ('grpc.max_send_message_length', 524288000), ('grpc.max_receive_message_length', 524288000), ('grpc.service_config', '{"methodConfig": [{"name": [{"service": "GrpcService"}], "retryPolicy": {"maxAttempts": 5, "initialBackoff": "5s", "maxBackoff": "30s", "backoffMultiplier": 2, "retryableStatusCodes": ["UNAVAILABLE"]}}]}'))...
(ReceiverProxyActor pid=2981) 2024-05-23 18:43:52.551 INFO grpc_proxy.py:379 [bob] -- [Anonymous_job] Successfully start Grpc service without credentials.
2024-05-23 18:43:53.244 INFO barriers.py:333 [bob] -- [Anonymous_job] SenderProxyActor has successfully created.
2024-05-23 18:43:53.244 INFO barriers.py:520 [bob] -- [Anonymous_job] Try ping ['alice'] at 0 attemp, up to 3600 attemps.
2024-05-23 18:43:58.482 WARNING api.py:607 [bob] -- [Anonymous_job] Encounter RemoteError happend in other parties, error message: FedRemoteError occurred at alice
Traceback (most recent call last):
  File "/root/project/psi2/sf_connect_unlabeled.py", line 46, in <module>
    spu.pir_setup(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 1990, in pir_setup
    return dispatch(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 111, in dispatch
    return _registrar.dispatch(self.device_type, name, self, *args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 80, in dispatch
    return self._ops[device_type][name](*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/kernels/spu.py", line 521, in pir_setup
    return sfd.get(res)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/distributed/primitive.py", line 156, in get
    return fed.get(object_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 613, in get
    raise e
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 602, in get
    values = ray.get(ray_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/worker.py", line 2524, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(FedRemoteError): ray::ReceiverProxyActor.get_data() (pid=2981, ip=192.168.15.7, actor_id=4f32dff258c018cdbff1ac3001000000, repr=<fed.proxy.barriers.ReceiverProxyActor object at 0x7f98089b5d50>)
  File "/root/anaconda3/envs/psi/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/root/anaconda3/envs/psi/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/proxy/barriers.py", line 236, in get_data
    raise data
fed.exceptions.FedRemoteError: FedRemoteError occurred at alice
^C2024-05-23 18:45:37.998 WARNING api.py:60 [bob] -- [Anonymous_job] Stop signal received (e.g. via SIGINT/Ctrl+C), try to shutdown fed. Press CTRL+C (or send SIGINT/SIGKILL/SIGTERM) to skip.
2024-05-23 18:45:37.998 WARNING api.py:325 [bob] -- [Anonymous_job] Shutdowning rayfed unintendedly...
2024-05-23 18:45:37.999 INFO api.py:337 [bob] -- [Anonymous_job] No wait for data sending.
2024-05-23 18:45:38.000 INFO message_queue.py:70 [bob] -- [Anonymous_job] Notify message polling thread[DataSendingQueueThread] to exit.
2024-05-23 18:45:38.000 INFO message_queue.py:70 [bob] -- [Anonymous_job] Notify message polling thread[ErrorSendingQueueThread] to exit.
2024-05-23 18:45:38.000 INFO api.py:352 [bob] -- [Anonymous_job] Shutdowned rayfed.
2024-05-23 18:45:38.000 CRITICAL api.py:356 [bob] -- [Anonymous_job] Exit now due to the previous error.
Exception ignored in: <module 'threading' from '/root/anaconda3/envs/psi/lib/python3.10/threading.py'>
Traceback (most recent call last):
  File "/root/anaconda3/envs/psi/lib/python3.10/threading.py", line 1567, in _shutdown
    lock.acquire()
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 65, in _signal_handler
    _shutdown(intended=False)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 357, in _shutdown
    sys.exit(1)
SystemExit: 1

@6fj
Copy link
Member

6fj commented May 24, 2024

hi @winnylyc

能否尝试将 label_max_len 同时设为 0 ?

@winnylyc
Copy link
Author

感谢您的回复!
我这边尝试了一下,似乎还是不行😓,下面是代码和输出。
Sender(Server):

import secretflow as sf
import spu
import time

cluster_config = {
    'parties' : {
        'alice': {
            'address': '127.0.0.1:59179',
            'listen_addr': '0.0.0.0:59179'
        },
        'bob': {
            'address': '127.0.0.1:53341',
            'listen_addr': '0.0.0.0:53341'
        }
    },
    'self_party': 'alice'
}
sf.shutdown
sf.init(address='local', cluster_config=cluster_config)
cluster_def = {
    "nodes": [
        {
            "party": "alice",
            "address": "127.0.0.1:45413"
        },
        {
            "party": "bob",
            "address": "127.0.0.1:47480"
        },
    ],
    "runtime_config": {
        "protocol": spu.spu_pb2.SEMI2K,
        "field": spu.spu_pb2.FM128
    },
}

spu = sf.SPU(
    cluster_def,
    link_desc={
        "connect_retry_times": 60,
        "connect_retry_interval_ms": 1000,
    }
)

npq = 1
spu.pir_setup(
    server="alice",
    input_path="/root/project/psi1/alice_exactpsi_1e6_unique.csv",
    key_columns=['name'],
    label_columns=[],
    oprf_key_path="/root/project/psi1/alice_oprf_key",
    setup_path=f"/root/project/psi1/alice_exactpsi_setup_1e6_len55_unique_npq{npq}",
    num_per_query=npq,
    label_max_len=0,
    bucket_size=1000000
)

输出:

/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
2024-05-24 10:29:06,237 INFO worker.py:1621 -- Started a local Ray instance.
2024-05-24 10:29:07.584 INFO api.py:233 [alice] -- [Anonymous_job] Started rayfed with {'CLUSTER_ADDRESSES': {'alice': '0.0.0.0:59179', 'bob': '127.0.0.1:53341'}, 'CURRENT_PARTY_NAME': 'alice', 'TLS_CONFIG': {}}
2024-05-24 10:29:08.255 INFO barriers.py:284 [alice] -- [Anonymous_job] Succeeded to create receiver proxy actor.
(ReceiverProxyActor pid=1837) 2024-05-24 10:29:08.251 INFO grpc_proxy.py:359 [alice] -- [Anonymous_job] ReceiverProxy binding port 59179, options: (('grpc.enable_retries', 1), ('grpc.so_reuseport', 0), ('grpc.max_send_message_length', 524288000), ('grpc.max_receive_message_length', 524288000), ('grpc.service_config', '{"methodConfig": [{"name": [{"service": "GrpcService"}], "retryPolicy": {"maxAttempts": 5, "initialBackoff": "5s", "maxBackoff": "30s", "backoffMultiplier": 2, "retryableStatusCodes": ["UNAVAILABLE"]}}]}'))...
(ReceiverProxyActor pid=1837) 2024-05-24 10:29:08.253 INFO grpc_proxy.py:379 [alice] -- [Anonymous_job] Successfully start Grpc service without credentials.
2024-05-24 10:29:08.940 INFO barriers.py:333 [alice] -- [Anonymous_job] SenderProxyActor has successfully created.
2024-05-24 10:29:08.940 INFO barriers.py:520 [alice] -- [Anonymous_job] Try ping ['bob'] at 0 attemp, up to 3600 attemps.
(SPURuntime(device_id=None, party=alice) pid=2042) WARNING:root:config=mode: MODE_SERVER_SETUP
(SPURuntime(device_id=None, party=alice) pid=2042) pir_protocol: PIR_PROTOCOL_KEYWORD_PIR_APSI
(SPURuntime(device_id=None, party=alice) pid=2042) pir_server_config {
(SPURuntime(device_id=None, party=alice) pid=2042)   input_path: "/root/project/psi1/alice_exactpsi_1e6_unique.csv"
(SPURuntime(device_id=None, party=alice) pid=2042)   setup_path: "/root/project/psi1/alice_exactpsi_setup_1e6_len55_unique_npq1"
(SPURuntime(device_id=None, party=alice) pid=2042)   key_columns: "name"
(SPURuntime(device_id=None, party=alice) pid=2042)   bucket_size: 1000000
(SPURuntime(device_id=None, party=alice) pid=2042)   apsi_server_config {
(SPURuntime(device_id=None, party=alice) pid=2042)     oprf_key_path: "/root/project/psi1/alice_oprf_key"
(SPURuntime(device_id=None, party=alice) pid=2042)     num_per_query: 1
(SPURuntime(device_id=None, party=alice) pid=2042)   }
(SPURuntime(device_id=None, party=alice) pid=2042) }
(SPURuntime(device_id=None, party=alice) pid=2042)
2024-05-24 10:29:11.165 WARNING cleanup.py:154 [alice] -- [Anonymous_job] Failed to send ObjectRef(8849b62d89cb30f9c4aba1b6791ea8e6170e26cf0100000001000000) to bob, error: ray::SenderProxyActor.send() (pid=1945, ip=192.168.15.7, actor_id=c4aba1b6791ea8e6170e26cf01000000, repr=<fed.proxy.barriers.SenderProxyActor object at 0x7fd04d2e9f90>)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::SPURuntime.pir_setup() (pid=2042, ip=192.168.15.7, actor_id=29ba65c288e8d8664bc7d8d201000000, repr=SPURuntime(device_id=None, party=alice))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 1245, in pir_setup
    report = psi.pir(config)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 148, in pir
    report_str = libpsi.libs.pir(config.SerializeToString(), link)
RuntimeError: what:
        [external/psi/psi/utils/batch_provider.cc:160] unsupported.
stacktrace:
#0 psi::apsi::PirServerSetup()+0x7f7a76196828
#1 psi::apsi::Launch()+0x7f7a7619e2f9
#2 psi::RunPir()+0x7f7a760a7003
#3 psi::BindLibs()::{lambda()#5}::operator()()+0x7f7a760a15d5
#4 pybind11::cpp_function::initialize<>()::{lambda()#3}::_FUN()+0x7f7a760a17e0
#5 pybind11::cpp_function::dispatcher()+0x7f7a76082fed
#6 cfunction_call+0x4fd907,upstream_seq_id: 5#0, downstream_seq_id: 6.
2024-05-24 10:29:11.165 INFO cleanup.py:161 [alice] -- [Anonymous_job] Sending error what:
        [external/psi/psi/utils/batch_provider.cc:160] unsupported.
stacktrace:
#0 psi::apsi::PirServerSetup()+0x7f7a76196828
#1 psi::apsi::Launch()+0x7f7a7619e2f9
#2 psi::RunPir()+0x7f7a760a7003
#3 psi::BindLibs()::{lambda()#5}::operator()()+0x7f7a760a15d5
#4 pybind11::cpp_function::initialize<>()::{lambda()#3}::_FUN()+0x7f7a760a17e0
#5 pybind11::cpp_function::dispatcher()+0x7f7a76082fed
#6 cfunction_call+0x4fd907

 to bob.
2024-05-24 10:29:11.166 WARNING cleanup.py:127 [alice] -- [Anonymous_job] Signal SIGINT to exit.
2024-05-24 10:29:11.166 WARNING api.py:60 [alice] -- [Anonymous_job] Stop signal received (e.g. via SIGINT/Ctrl+C), try to shutdown fed. Press CTRL+C (or send SIGINT/SIGKILL/SIGTERM) to skip.
2024-05-24 10:29:11.166 WARNING api.py:325 [alice] -- [Anonymous_job] Shutdowning rayfed unintendedly...
2024-05-24 10:29:11.167 ERROR api.py:330 [alice] -- [Anonymous_job] Cross-silo sending error occured. ray::SenderProxyActor.send() (pid=1945, ip=192.168.15.7, actor_id=c4aba1b6791ea8e6170e26cf01000000, repr=<fed.proxy.barriers.SenderProxyActor object at 0x7fd04d2e9f90>)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::SPURuntime.pir_setup() (pid=2042, ip=192.168.15.7, actor_id=29ba65c288e8d8664bc7d8d201000000, repr=SPURuntime(device_id=None, party=alice))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 1245, in pir_setup
    report = psi.pir(config)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 148, in pir
    report_str = libpsi.libs.pir(config.SerializeToString(), link)
RuntimeError: what:
        [external/psi/psi/utils/batch_provider.cc:160] unsupported.
stacktrace:
#0 psi::apsi::PirServerSetup()+0x7f7a76196828
#1 psi::apsi::Launch()+0x7f7a7619e2f9
#2 psi::RunPir()+0x7f7a760a7003
#3 psi::BindLibs()::{lambda()#5}::operator()()+0x7f7a760a15d5
#4 pybind11::cpp_function::initialize<>()::{lambda()#3}::_FUN()+0x7f7a760a17e0
#5 pybind11::cpp_function::dispatcher()+0x7f7a76082fed
#6 cfunction_call+0x4fd907
2024-05-24 10:29:11.167 INFO api.py:337 [alice] -- [Anonymous_job] No wait for data sending.
2024-05-24 10:29:11.168 INFO message_queue.py:70 [alice] -- [Anonymous_job] Notify message polling thread[ErrorSendingQueueThread] to exit.
2024-05-24 10:29:11.169 INFO api.py:352 [alice] -- [Anonymous_job] Shutdowned rayfed.
2024-05-24 10:29:11.169 CRITICAL api.py:356 [alice] -- [Anonymous_job] Exit now due to the previous error.
(SPURuntime(device_id=None, party=alice) pid=2042) [2024-05-24 10:29:11.138] [info] [pir.cc:245] table_params hash_func_count:1
(SPURuntime(device_id=None, party=alice) pid=2042) [2024-05-24 10:29:11.138] [info] [pir.cc:247] table_params max_items_per_bin:55
(SPURuntime(device_id=None, party=alice) pid=2042) [2024-05-24 10:29:11.138] [info] [pir.cc:250] seal_params poly_modulus_degree:2048
(SPURuntime(device_id=None, party=alice) pid=2042) [2024-05-24 10:29:11.138] [info] [pir.cc:252] query_params query_powers size:55
(SPURuntime(device_id=None, party=alice) pid=2042) [2024-05-24 10:29:11.150] [info] [pir.cc:267] bucket:0 bucket_setup_path:/root/project/psi1/alice_exactpsi_setup_1e6_len55_unique_npq1/bucket_0

Receiver(Client):

import secretflow as sf
import spu
import time

cluster_config = {
    'parties' : {
        'alice': {
            'address': '127.0.0.1:59179',
            'listen_addr': '0.0.0.0:59179'
        },
        'bob': {
            'address': '127.0.0.1:53341',
            'listen_addr': '0.0.0.0:53341'
        }
    },
    'self_party': 'bob'
}
sf.shutdown
sf.init(address='local', cluster_config=cluster_config)
cluster_def = {
    "nodes": [
        {
            "party": "alice",
            "address": "127.0.0.1:45413"
        },
        {
            "party": "bob",
            "address": "127.0.0.1:47480"
        },
    ],
    "runtime_config": {
        "protocol": spu.spu_pb2.SEMI2K,
        "field": spu.spu_pb2.FM128
    },
}

spu = sf.SPU(
    cluster_def,
    link_desc={
        "connect_retry_times": 60,
        "connect_retry_interval_ms": 1000,
    }
)

npq = 1
spu.pir_setup(
    server="alice",
    input_path="/root/project/psi1/alice_exactpsi_1e6_unique.csv",
    key_columns=['name'],
    label_columns=[],
    oprf_key_path="/root/project/psi1/alice_oprf_key",
    setup_path=f"/root/project/psi1/alice_exactpsi_setup_1e6_len55_unique_npq{npq}",
    num_per_query=npq,
    label_max_len=0,
    bucket_size=1000000
)

输出:

/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
2024-05-24 10:29:06,237 INFO worker.py:1621 -- Started a local Ray instance.
2024-05-24 10:29:07.600 INFO api.py:233 [bob] -- [Anonymous_job] Started rayfed with {'CLUSTER_ADDRESSES': {'alice': '127.0.0.1:59179', 'bob': '0.0.0.0:53341'}, 'CURRENT_PARTY_NAME': 'bob', 'TLS_CONFIG': {}}
2024-05-24 10:29:08.255 INFO barriers.py:284 [bob] -- [Anonymous_job] Succeeded to create receiver proxy actor.
(ReceiverProxyActor pid=1838) 2024-05-24 10:29:08.251 INFO grpc_proxy.py:359 [bob] -- [Anonymous_job] ReceiverProxy binding port 53341, options: (('grpc.enable_retries', 1), ('grpc.so_reuseport', 0), ('grpc.max_send_message_length', 524288000), ('grpc.max_receive_message_length', 524288000), ('grpc.service_config', '{"methodConfig": [{"name": [{"service": "GrpcService"}], "retryPolicy": {"maxAttempts": 5, "initialBackoff": "5s", "maxBackoff": "30s", "backoffMultiplier": 2, "retryableStatusCodes": ["UNAVAILABLE"]}}]}'))...
(ReceiverProxyActor pid=1838) 2024-05-24 10:29:08.254 INFO grpc_proxy.py:379 [bob] -- [Anonymous_job] Successfully start Grpc service without credentials.
2024-05-24 10:29:08.936 INFO barriers.py:333 [bob] -- [Anonymous_job] SenderProxyActor has successfully created.
2024-05-24 10:29:08.937 INFO barriers.py:520 [bob] -- [Anonymous_job] Try ping ['alice'] at 0 attemp, up to 3600 attemps.
2024-05-24 10:29:11.172 WARNING api.py:607 [bob] -- [Anonymous_job] Encounter RemoteError happend in other parties, error message: FedRemoteError occurred at alice
Traceback (most recent call last):
  File "/root/project/psi2/sf_connect_unlabeled.py", line 46, in <module>
    spu.pir_setup(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 1990, in pir_setup
    return dispatch(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 111, in dispatch
    return _registrar.dispatch(self.device_type, name, self, *args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 80, in dispatch
    return self._ops[device_type][name](*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/kernels/spu.py", line 521, in pir_setup
    return sfd.get(res)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/distributed/primitive.py", line 156, in get
    return fed.get(object_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 613, in get
    raise e
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 602, in get
    values = ray.get(ray_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/worker.py", line 2524, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(FedRemoteError): ray::ReceiverProxyActor.get_data() (pid=1838, ip=192.168.15.7, actor_id=cb60c17fef717f98f61a8d5f01000000, repr=<fed.proxy.barriers.ReceiverProxyActor object at 0x7fd726c7dd80>)
  File "/root/anaconda3/envs/psi/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/root/anaconda3/envs/psi/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/proxy/barriers.py", line 236, in get_data
    raise data
fed.exceptions.FedRemoteError: FedRemoteError occurred at alice

@6fj
Copy link
Member

6fj commented May 24, 2024

hi @winnylyc

感谢你的反馈,pir这边我们正处在代码调整中,会将目前这些问题一并解决。我们大概会在6月底之前完成。

@winnylyc
Copy link
Author

感谢您的反馈!期待PIR部分的调整!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants