Skip to content

Commit

Permalink
Issue/94/fix memory lack (#95)
Browse files Browse the repository at this point in the history
* add the code to fix the memory issue, add a thread timer to print logs during the streaming process

* add the code to fix the memory issue, add a thread timer to print logs during the streaming process

* gcn stream listen now only the gw significant

* move a function

* broadcast variables in global context

* fix globals

* fix bug

* fix bug

* fix function issue

* ci run locally

* fix kwargs call

* fix type problem

* swap the broadcast join

* Issue/96/offline refactoring (#97)

* large refactoring of fink_mm

* pep8

* refactor in progress

* online test ok

* add offline test, fix pep8

* offline test ok

* move ztf_join_gcn from online to fink_mm directory, remove the old offline

* fix pep8

* fix init test

* fix fun_utils test

* fix pep8

* fix join test

* fix join test

* fix distrib test

* fix distrib test

* fix distrib test

* fix distrib test

* fix distrib test

* hope fix test

* improve the skymap loading from hdfs, fix offline test

* fix gcn_stream

* fix conf file for test

* fix conf file for test
  • Loading branch information
FusRoman authored Oct 4, 2023
1 parent 5f568f9 commit 5b4e5ff
Show file tree
Hide file tree
Showing 27 changed files with 1,069 additions and 1,225 deletions.
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ exclude =
build,
dist
per-file-ignores =
../fink-mm/fink_mm/online/ztf_join_gcn.py:W503,E402
../fink-mm/fink_mm/ztf_join_gcn.py:W503,E402
../fink-mm/fink_mm/offline/spark_offline.py:W503,W605
../fink-mm/fink_mm/utils/fun_utils.py:F811
../fink-mm/fink_mm/distribution/distribution.py:W503
Expand Down
2 changes: 1 addition & 1 deletion fink_mm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__version__ = "0.14.3"
__version__ = "0.15"
__distribution_schema_version__ = "1.3"
__observatory_schema_version__ = "1.1"
4 changes: 2 additions & 2 deletions fink_mm/conf/distribute_for_test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ hbase_catalog=/home/roman.le-montagner/fink-broker/catalogs_hbase/ztf.jd.json
# port are the port where the hdfs driver listen
# user are the name of the hdfs user
[HDFS]
host=
host=127.0.0.1
port=
user=

Expand Down Expand Up @@ -71,7 +71,7 @@ username_writer=toto
password_writer=tata

[ADMIN]
verbose=False
debug=False
# Healpix map resolution, better if a power of 2
NSIDE=4

Expand Down
6 changes: 3 additions & 3 deletions fink_mm/conf/fink_mm.conf
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ hdfs_gcn_storage=/user/roman.le-montagner/gcn_storage/raw
# They can be in local FS (/path/ or files:///path/) or
# in distributed FS (e.g. hdfs:///path/).
# Be careful though to have enough disk space!
online_ztf_data_prefix=fink_mm/test/test_data/ztf_test/online
online_ztf_data_prefix=fink_mm/test/test_data/ztf_test

# Prefix path on disk to save GRB join ZTF data (work for both online and offline).
online_grb_data_prefix=fink_mm/test/test_output
Expand All @@ -26,7 +26,7 @@ hbase_catalog=/home/roman.le-montagner/fink-broker/catalogs_hbase/ztf.jd.json
# port are the port where the hdfs driver listen
# user are the name of the hdfs user
[HDFS]
host=
host=127.0.0.1
port=
user=

Expand Down Expand Up @@ -71,7 +71,7 @@ username_writer=toto
password_writer=tata

[ADMIN]
verbose=False
debug=True
# Healpix map resolution, better if a power of 2
NSIDE=4

Expand Down
6 changes: 3 additions & 3 deletions fink_mm/conf/integration.conf
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ hdfs_gcn_storage=fink_mm/ci_gcn_test
# They can be in local FS (/path/ or files:///path/) or
# in distributed FS (e.g. hdfs:///path/).
# Be careful though to have enough disk space!
online_ztf_data_prefix=fink_mm/test/test_data/ztf_test/online
online_ztf_data_prefix=fink_mm/test/test_data/ztf_test

# Prefix path on disk to save GRB join ZTF data (work for both online and offline).
online_grb_data_prefix=fink_mm/ci_join_test
Expand All @@ -26,7 +26,7 @@ hbase_catalog=$FINK_HOME/catalogs_hbase/ztf.jd.json
# port are the port where the hdfs driver listen
# user are the name of the hdfs user
[HDFS]
host=
host=127.0.0.1
port=
user=

Expand Down Expand Up @@ -71,7 +71,7 @@ username_writer=toto
password_writer=tata

[ADMIN]
verbose=True
debug=True
# Healpix map resolution, better if a power of 2
NSIDE=4

Expand Down
14 changes: 9 additions & 5 deletions fink_mm/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,15 @@ def init_LVK(doctest_namespace):
@pytest.fixture(autouse=True, scope="session")
def init_spark(doctest_namespace):
from astropy.time import Time
from fink_mm.utils.application import DataMode
import pyspark.sql.functions as sql_func

online_output_tempdir = tempfile.TemporaryDirectory()
doctest_namespace["online_output_tempdir"] = online_output_tempdir

doctest_namespace["Time"] = Time
doctest_namespace["DataMode"] = DataMode
doctest_namespace["sql_func"] = sql_func

grb_data = "fink_mm/test/test_data/gcn_test/raw/year=2019/month=09/day=03"
gw_data = "fink_mm/test/test_data/S230518h_0_test"
Expand All @@ -184,15 +188,15 @@ def init_spark(doctest_namespace):
doctest_namespace["join_data"] = join_data
doctest_namespace["alert_data"] = alert_data

ztf_datatest = "fink_mm/test/test_data/ztf_test/online"
ztf_datatest = "fink_mm/test/test_data/ztf_test"
gcn_datatest = "fink_mm/test/test_data/gcn_test/raw"
join_data_test = "fink_mm/test/test_data/online"
offline_join_data_test = "fink_mm/test/test_data/offline_datatest.parquet"
online_data_test = "fink_mm/test/test_data/online"
offline_data_test = "fink_mm/test/test_data/offline"

doctest_namespace["ztf_datatest"] = ztf_datatest
doctest_namespace["gcn_datatest"] = gcn_datatest
doctest_namespace["join_data_test"] = join_data_test
doctest_namespace["offline_data_test"] = offline_join_data_test
doctest_namespace["online_data_test"] = online_data_test
doctest_namespace["offline_data_test"] = offline_data_test

fink_home = os.environ["FINK_HOME"]
hbase_catalog = fink_home + "/catalogs_hbase/ztf.jd.json"
Expand Down
19 changes: 7 additions & 12 deletions fink_mm/distribution/distribution.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ def launch_distribution(arguments):
>>> launch_distribution({
... "--config" : "fink_mm/conf/distribute_for_test.conf",
... "--night" : "20190903",
... "--exit_after" : 30
... "--exit_after" : 30,
... "--verbose" : False
... })
>>> consumer = AlertConsumer(topics, myconfig)
Expand All @@ -251,7 +252,7 @@ def launch_distribution(arguments):
config = get_config(arguments)
logger = init_logging()

verbose = return_verbose_level(config, logger)
verbose = return_verbose_level(arguments, config, logger)

spark_submit = read_and_build_spark_submit(config, logger)

Expand All @@ -272,6 +273,7 @@ def launch_distribution(arguments):
_,
_,
_,
_,
kafka_broker,
username_writer,
password_writer,
Expand All @@ -297,20 +299,13 @@ def launch_distribution(arguments):
external_files,
)

process = subprocess.Popen(
spark_submit,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
shell=True,
)
completed_process = subprocess.run(spark_submit, shell=True, capture_output=True)

stdout, stderr = process.communicate()
if process.returncode != 0: # pragma: no cover
if completed_process.returncode != 0: # pragma: no cover
logger.error(
"fink-mm distribution stream spark application has ended with a non-zero returncode.\
\n\t cause:\n\t\t{}\n\t\t{}\n\n\n{}\n\n".format(
stdout, stderr, spark_submit
completed_process.stdout, completed_process.stderr, spark_submit
)
)
exit(1)
Expand Down
10 changes: 6 additions & 4 deletions fink_mm/fink_mm_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,16 @@ def main():

elif arguments["join_stream"]:
if arguments["online"]:
from fink_mm.online.ztf_join_gcn import launch_joining_stream
from fink_mm.ztf_join_gcn import launch_join
from fink_mm.utils.application import DataMode

launch_joining_stream(arguments)
launch_join(arguments, DataMode.STREAMING)

elif arguments["offline"]:
from fink_mm.offline.spark_offline import launch_offline_mode
from fink_mm.ztf_join_gcn import launch_join
from fink_mm.utils.application import DataMode

launch_offline_mode(arguments)
launch_join(arguments, DataMode.OFFLINE)

elif arguments["distribute"]:
from fink_mm.distribution.distribution import launch_distribution
Expand Down
2 changes: 1 addition & 1 deletion fink_mm/gcn_stream/gcn_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ def start_gcn_stream(arguments):
config = get_config(arguments)
logger = init_logging()

logs = return_verbose_level(config, logger)
logs = return_verbose_level(arguments, config, logger)

# keep track of the gcn update
gcn_tracking = pd.DataFrame(columns=["triggerId", "triggerTimejd", "nb_received"])
Expand Down
Loading

0 comments on commit 5b4e5ff

Please sign in to comment.