Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trouble Scaling XGBoost beyond in-memory training on databricks #10853

Open
gdubs89 opened this issue Sep 27, 2024 · 36 comments
Open

Trouble Scaling XGBoost beyond in-memory training on databricks #10853

gdubs89 opened this issue Sep 27, 2024 · 36 comments

Comments

@gdubs89
Copy link

gdubs89 commented Sep 27, 2024

I'm currently training a binary classifier using a tiny sample of a dataset. The dataset is of size approx 50bn rows per day, and we persist the data for ~60 days, so in theory I could be training this data on up to ~3TN rows of data. Of course that's probably a little excessive, but currently I'm training on a 0.1% sample of a day's data, i.e. approx 50 million rows.

I do this by doing df = spark.read.parquet('s3://bucketname/data.pq').sample(fraction=0.001).toPandas()

I can play with this fraction a little bit, I've pushed it as far as 100 million rows and might be able to push it a bit further, but fundamentally the approach of pulling everything into a massive driver node and training in memory is not scalable and it's never going to allow me to train on 1 billion rows, or 10 billion rows, or more.

To that end, I've been looking for the canonical way to scale xgboost, i.e. do distributed training on databricks. I'm open to doing GPU training but my strong suspicion is that I'm far more memory-limited than compute limited (when training on 50million rows on a single EC2 machine, once the data has been read in and converted to dmatrices, the actual training is a breeze, takes 10-15 minutes), so my instinct is to try distributed CPU training.

Also, I'm using the following bells & whistles which I'll need any distributed training to support

  1. Early stopping
  2. Monotonicity constraints on some input features
  3. Native handling of categoricals (i.e. I'm not going to one-hot encode the data. Either xgboost needs to handle the categoricals internally, or needs to be able to interact with a sparse feature representation)

For the sake of benchmarking, I've prepared the following 4 datasets:

  1. ~600 million rows (being able to train on this would constitute success I think, this is significantly more than I'm ever going to be able to handle on a single big EC2 instance)
  2. ~50 million rows (this is the benchmark, I can train on this relatively comfortably on a single EC2 instance)
  3. ~50 million rows but with about half the number of columns
  4. ~5 million rows (for quick prototyping/testing of syntax)
    (in each case there's a train set, the sizes above give the size of the train set, and then there's a corresponding eval set approx 20% of the size)

I first tried to do this using xgboost-dask. This is the solution I landed on:

import dask.distributed
import dask.dataframe as dd
from xgboost import dask as dxgb
from xgboost import DMatrix as xgb_DMatrix

cluster = dask.distributed.LocalCluster(n_workers=8, threads_per_worker=16, memory_limit='91 GiB')
#was using a cluster of 8 i3.4xlarge, driver is also i3.4xlarge
client = dask.distributed.Client(cluster)

train_ddf = dd.read_parquet("s3://bucketname/train.pq", storage_options={...})
eval_ddf = dd.read_parquet("s3://bucketname/eval.pq", storage_options={...})

categorical_columns = ["X", "Y", "Z"]
features = ["A", "B", "C", "X", "Y", "Z"] # ABC are dense/numerical columns, XYZ are all integer-valued columns, but they should be interpreted as categorical

train_ddf[categorical_columns] = train_ddf[categorical_columns].astype('category').categorize()
category_mappings = {col: train_ddf[col].cat.categories for col in categorical_columns}
eval_ddf[categorical_columns] = eval_ddf[categorical_columns].astype('category')
for col in categorical_columns:
    eval_ddf[col] = eval_ddf[col].cat.set_categories(category_mappings[col])

dtrain = dxgb.DaskDMatrix(
    client=client,
    data=train_ddf[features],
    label=train_ddf['label'],
    enable_categorical=True
)

dvalid = dxgb.DaskDMatrix(
    client=client,
    data=eval_ddf[features],
    label=eval_ddf['label'],
    enable_categorical=True
)

params = {
        "objective": "binary:logistic",
        "max_depth": 8,
        "learning_rate":0.1,
        'monotone_constraints': {'B': 1},
        'eval_metric':'logloss',
        'tree_method':'hist'
    }

model = dxgb.train(
    client=client,
    params=params,
    dtrain=dtrain,
    num_boost_round=2000,
    early_stopping_rounds=10,
    evals=[(dvalid, 'eval')],
    verbose_eval=1
)

This "worked" when I used dataset 3 described above, but failed when I used dataset 2. I.e. 50 million rows and about ~20 columns worked but 50 million rows and ~50 columns was too much. I was also a little suspicious that dask wasn't utilising the worker nodes. I can't connect to the dask dashboard, I think it's something I'd need to talk to our databricks admin about (I tried to SSH into the driver but my connection timed out, to my best understanding, we'd need to unblock some port), but the databricks cluster dashboard only ever showed the driver node being engaged (in retrospect, it could also possibly have been just one worker being engaged, if this is deemed relevant I can re-run and check). Note that when I do print(client), it's telling me I have 128 threads (8*16, i.e. the number of worker cores) and ~500gb of RAM, but they don't seem to be being engaged by the training process.

If only one machine is being engaged, each of these machines has significantly less memory than the machine I used to train on the 50 million row dataset in memory, so it's not entirely surprising that this fell over at the point where it did. I tested this by firing up a "wonky" cluster, comprised of two rd5.16xlarge workers and a driver of the same type. This worked, but again only one machine was being engaged, so we've not gained anything over just training on a single large machine.

So my suspicion here is that raw dask doesn't play very well with databricks/spark, so instead I decided to try dask-databricks. So basically in the above code, replace

import dask.distributed
cluster = dask.distributed.LocalCluster(n_workers=8, threads_per_worker=16, memory_limit='91 GiB')
client = dask.distributed.Client(cluster)

with

import dask_databricks
client = dask_databricks.get_client()

Same deal, when I print(client), I see the number of threads/amount of memory I expect. However when running on a cluster of 8 i3.4xlarge workers, I have the same scaling issues as previously, I can run on the 50 milliow row dataset with ~20 columns but when I try on the set with ~50 columns, it falls over.

I'm now running a cluster of 12 r5d.8xlarge machines (I should have used r5d.16xlarge like I did before for reproducibility), and the training run for the 50million dataset with 50 columns hasn't technically crashed, but it's been running for 50 minutes now (which, given how big this cluster is compared to the single machine I can train this in memory in in ~10-15 minutes, is bad). When using dask-databricks, I can access the dask dashboard, and while I'm not expert on how to read this, it looks like all CPUs are being used, but only like 1.5/32 cores are being used per worker. This is in line with what the databricks cluster's dashboard is telling me.

I also get a warning

/databricks/python/lib/python3.11/site-packages/distributed/client.py:3361: UserWarning: Sending large graph of size 41.96 MiB.
This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.

which I don't fully know what to do with.

The cluster I'm currently using has at least 3x more RAM and 4x more cores than the largest single EC2 machine, the one that I've been using to train on 50million rows/50 columns (and that I've shown can be pushed a little bit further, at least to 100million rows, maybe to 150m, probably not as far as 200m), and also I would have hoped that when doing distributed training in dask, you'd get much more memory efficient handling of the data than when pulling the data into pandas. And yet I'm not even getting close to being able to replicate the performance I get with a single EC2 instance, which does not seem to bode well for scaling up to 500 million rows and beyond.

Help either with this, or other ways to scale XGBoost beyond in-memory training would be greatly appreciated. I was hoping there would be an accepted way to do distributed xgboost training but alas, it doesn't seem that there is an accepted wisdom on how to do this.

Other notes:

  • I'm using the most recent, 15.4 LTS databricks runtime
  • When I ran this in Vanilla dask, I got verbose training output. When I used dask-databricks, I lost verbosity
@trivialfis
Copy link
Member

Thank you for sharing your experience. Based on your description, you are trying to find some "best practices" for distributed training. I will try to do some more experiments and come up with something more comprehensive like a blog post. But for now, let's start with some checks:

  • Is the data reasonably balanced across workers? The overhead of waiting for one of the imbalanced workers is quite high. XGBoost relies on collective operations like allreduce during tree build, if every synchronization requires waiting, the total training time can be extremely slow. I recently observed on a 48GPU cluster that a 5-second iteration can be stalled to above 20 minutes due to imbalanced data. (2TB data, BTW) Imbalanced dataset also causes OOM errors.
  • The dask dashboard is really helpful, please try to get it configured before doing any serious work. Feel free to explore more about the dashboard, most of the performance issues require the dashboard to debug.
  • Yes, you will hit scaling performance limit eventually, as with all collective-based distributed training.

@gdubs89
Copy link
Author

gdubs89 commented Sep 27, 2024

On your first point, how would you suggest I dig into this a bit more? I've looked at three things:

  1. Databricks cluster dashboard
  2. Dask dashboard (note, when using dask-databricks, this works no problem)
  3. Running htop on a web terminal (presumably this only tells me about the driver node)

Some screenshots from the dask dashboard (I'm new to this so not entirely sure what I'm looking for)
image
image
image
image

Databricks cluster dashboard
image
(they're all hovering around 1.8%, which is less than 1/32)

Htop shows generally one CPU near 100% on the driver node, it occasionally bounces between different CPUs, but never more than one being engaged.

Also note from my previous point, when I used a cluster of 8 i3.4xlarge machines, which should collectively have easily enough memory to handle this data, it crashes. So I have some concerns that a lot/all of the data is being pulled onto the driver/a single worker node, but not sure how to debug this further.

And yes, to your last point, I'm not expecting to be able to scale this infinitely, training on 3 trillion rows of data is clearly a pipe dream. But given I can train on ~100 million rows on a single machine, I'd be very surprised if it weren't feasible to increase by 1, if not 2 orders of magnitude by going distributed.

@trivialfis
Copy link
Member

Maybe starting with observing the CPU/Memory usage across workers, there's a "workers" tag in the dask dashboard. I can't provide a definite answer on why a specific run is slow without running it myself, but in general, it's the data balance issue. On GPU, sometimes I just repartition the data and the problem can be mitigated.

The XGBoost train function in the dashboard task view is actually a lambda function, one for each worker, do you see dask waiting for them to finish?
If you are using the default tree method (hist), consider using the QuantileDMatrix instead of the DMatrix for the training dataset, the former is much more memory efficient.

I'm also working on improving the scaling at the moment and will add some logging facilities to XGBoost to help debug.

@gdubs89
Copy link
Author

gdubs89 commented Sep 29, 2024

OK, I've got the vanilla dask dashboard working in the meantime (previously only had it working when using dask-databricks), so will now provide a detailed comparison of the dashboards in a variety of cases:

Scenario 1: ~50 million rows, 19 columns

First Hardware Setup, I used a "small, fat" setup, i.e. a small fleet of large machines, driver + 3 workers all of type rd5.16xlarge (total 256 cores, ~2TB memory)

dask-databricks: The whole process of reading in the data, categorizing, creating dmatrices and training for 50 epochs takes about 1 minute, of which ~45 seconds are spent training (and 15 seconds on data prep). When using dask-databricks, I don't get any verbose training output, but based on adding some print statements to the progress, here's a screenshot of the dask dashboard at a point when I'm fairly sure training is going on
image
so we're getting decent CPU utilisation, but still nowhere near full utilisation (which would be 6400%). Also, this amount of data is using a tiny fraction of the memory, making me think I should with this cluster be easily able to scale up by an order of magnitude, and would only need to scale the cluster a bit to get to two orders of magnitude

vanilla dask:
image
Pretty similar CPU and memory utilisation, I also get verbose training output. Data prep takes circa 15 seconds, training about 60 seconds.

For benchmark, this dataset also easily fits in memory on a 6d1d.36xlarge (128 cores, 1 GB RAM). In-memory training for 50 epochs (same process of creating dmatrices and fitting via the learning API) takes about 1 minute.

Scenario 2, ~40 million rows, 19 columns

dask-databricks:
Casting to categoricals and categorizing starts to take significant time here (100s). CPU utilisation is very poor
image
Creating DMatices takes almost 3 minutes, CPU utilisation also poor. We do see all workers engaged and more than one CPU, but nowhere near full CPU engagement
image
Training starts to get quite squiffy, screenshot doesn't really capture it well. You tend to see at any one point, that one worker might have quite high (20+ cpus) CPU utilisation and another 1 or 2 workers have close to 0%
image
Memory utilisation is also not very symmetric in that 3 machines are much more engaged than the 4th. Also quite surprising how much more memory this dataset is taking up than the previous one. All of the extra columns are integer rather than dense/float, but perhaps the categorical type is more data hungry.

vanilla-dask:
Worker utilisation while categorizing:
image
Takes about 80s

Then we get into dmatrix creation
image
Dmatrices take almost 4 minutes to create.

Things get a bit squiffy when we get to training. The CPU utilisation fluctuations a lot, so a screenshot isn't that helpful, but in one I did take here, we see one of the workers close to maxed out (29/32 CPUs fully engaged) but two of them doing almost nothing. So indeed some hypothesis of imbalanced worker utilisation seems fair
image
taken at another point in training
image

Training, once it finally started, took about 6 minutes to complete

In-memory training goes off without a hitch, the actual training stage takes 90s. Categorizing+dmatrix creation takes about 40s, obviously the reading from spark into pandas stage takes a bit longer (like 3 minutes)

I also tried using QuantileDmatrices for the larger dataset, just in the dask-databricks setup. QuantileDmatrices took about 4 minutes to create again.

Here's a training screenshot where it does look like 2 machines are being well-utilised and one is doing basically nothing, but here's another where it looks quite poor
image
image
If you watch the dashboard for a while, it's a little hard to discern what's going on. Basically fluctuation between fairly high utilisation across the board, to times when there's low utilisation across the board, to times when some are highly utilised and some not. Training took 8 minutes

I should also add, that in the larger dataset case, both on vanilla dask and dask-databricks, the whole thing was pretty shonky and inconsistent. It probably fails around 2/3 of the time for various reasons. When it fails, I just detach and re-attach the spark cluster, start a fresh dask cluster, and just run the code again, nothing else changed. Sometimes it works, sometimes it doesn't. One failure mode (this is easier to ascertain in vanilla dask than dask-databricks as you get more output) seems to be that DMatrix creation causes a worker to be terminated and restarted, which in turn means that training has the wrong IP addresses for the workers so fails. But I've also had more esoteric messages, and one case where nothing went wrong but training losses started to blow up, which is very odd indeed.

Finally, in the vanilla dask setup, I tried scaling this up to the ~600million row dataset (full 49 columns). This didn't even make it part the read-in and partition the data stage, it's been 25 minutes and I'm getting almost no CPU utilisation, so I think I'm going to shut this off
image

In conclusion, for this task which can still be done pretty comfortably in-memory on a single big EC2 machine, when using Dask I find:

  • The process is inconsistent, frequently fails, sometimes works, little rhyme or reason as to why
  • DMatrix creation seems to be a heavy process that isn't making much use of parallelisation
  • When it comes to training, CPU utilisation certainly is far from 100% across the board
  • Memory utilisation seems low, meaning I'd expect to be able to scale this up to much larger datasets and worst case it takes forever. But that does not appear to be the case
  • Despite my clusters having significantly more firepower than the single EC2 machine I use for in-memory training, distributed training on this dataset is significantly slower than doing it in memory (not to mention much less reliable)
  • When I try to scale up to an even larger dataset, really not clear what's going on. A simple dd.read_parquet(..).repartition()doesn't seem to work, which obviously isn't an xgboost problem per se admittedly, but perhaps plays into my original post of "still looking for the canonical way to scale xgboost beyond in-memory training"

I will also try to do all of the above with differently shaped clusters which use more and smaller workers, will post updates tomorrow.

@gdubs89
Copy link
Author

gdubs89 commented Sep 30, 2024

Same analysis using 12 r4.4xlarge workers (+ same driver). This gives us about 200 cores and 1.5TB of RAM, but with a much "more distributed" setup

Vanilla Dask

Process takes 5 minutes end to end.
image
CPU utilisation is extremely low, as is memory utilisation (again making it rather implausible that I'm gonna get any OOM error when scaling up to more columns or even scaling up the number of rows by an order of magnitude)

On the larger dataset (i.e. same number of rows but more columns, where all the additional columns are of categorical type), the first time, it failed at the dmatrix creation stage. The second time, it managed to create the dmatrices, but I'm still getting some weird output that suggests it failed to distributed them quite as intended
image
As before, when it gets to training, we see highly heterogenous distribution of CPU utilisation
image

The training then results in the loss starting to explode before it fails
image
image
(I've partially redacted that output as it contained some information about the training data)

Dask Databricks
Noticeably better performance on the smaller dataset. End to end training the read-in + train process took 35s, and CPU utilisation, while far from full, was much better
image

When we get to the bigger dataset, again things are just far more variable than with the smaller one
image
and a screenshot doesn't fully capture it, but what is clear is that

  • Overall CPU utilisation is lower
  • It fluctuates a lot more

However, end to end, the process completed in 4 minutes, and the actual training only took approx 1 of those minutes.
The memory utilisation again is so low, that I figured why not try this on a dataset that's 10x larger than this.

As before, when I try this, it seems to just get stuck at data read-in/repartition stage, with almost zero CPU utilisation
image

So in summary:

  • When I go to this more horizontally scaled dask cluster, things really stop working in vanilla dask.
  • dask-databricks still does OK, potentially outperforms in-memory training by a little bit. But CPU utilisation is still pretty asymmetric
  • despite memory utilisation being so low on even the larger dataset, when I try to scale to a dataset that's 10x the size, I can't get past the data read-in/repartition stage

@trivialfis
Copy link
Member

trivialfis commented Sep 30, 2024

This is a combination of dask data balancing issues, dask memory usage and data spilling issues (the read/partition), XGBoost training performance issues, and optimization in Databricks. Let's get some of the easy issues resolved first.

  • If the output is not printed as expected, it might be caused by a buffered stderr/stdout. I recently ran into an issue that a worker doesn't flush the output until it's killed, accumulating ~600M of logging! There's no workaround in XGBoost at the moment, but you can define your printing callbacks that flush the output.
  • The QDM is primarily used to reduce memory usage during DMatrix construction, and performance is more or less a bonus. From the screenshot, I would suggest it's doing what's expected to do. (reducing memory)
  • The DMatrix construction for the dask interface makes sure all prior lazy computations in dask are materialized, as a result, sometimes you get completely unrelated errors in the DMatrix constructor. A way to separate the processes is using client.persist and distributed.wait before DMatrix.
  • PySpark takes (significantly) longer to construct the DMatrix than dask as it has this weird design that needs to construct and iterate through n_samples numpy arrays instead of a few partitions. On the other hand, the data distribution for PySpark is more stable and well-balanced.

I would suggest that the first thing that needs to be done is to ensure the data is well-balanced based on these hints from the screenshots:

  • The pipeline ran into empty workers, meaning these workers were starving with no data on it.
  • The CPU usage is flaky, which is a sign of workers waiting for each other.
  • OOM error is flaky, which indicates from time to time the data is being concentrated into a single worker.

cc @fjetter @mrocklin for awareness.

@gdubs89
Copy link
Author

gdubs89 commented Sep 30, 2024

Wow OK, I think I've mostly solved the issue of it just "not doing anything" when I try to scale to data sizes which cannot be trained in memory.

The problem was that the raw dataset is ~50billion rows and is spread over ~120k partitions. I am doing a downsampling in pyspark (before starting the dask cluster) and writing a downsampled ~500 million row dataset to disk. Little did I know that spark was retaining the original partitioning and thus writing this 500 million row dataset to disk over 120k partitions, which is A) a lot of partitions and B) extremely fragmented. Either way, it seems this just totally overwhelmed dask.

If I do a repartition in spark before writing to disk, dask can handle it and I now seem to be successfully training on 500 million rows of data, only using ~15% of the RAM on the aforementioned cluster, so I'm gonna scale this up to ~2billion rows and see how we go. So I've successfully moved beyond a data regime which could be trained in-memory 🚀

CPU utilisation is still a bit heterogenous at times, but you do get moments of beauty like this one
image
if you watch the dashboard for long enough👌

Will create some somewhat larger datasets and see how far I can scale this paradigm and report back.

In the meantime, one question around partitioning of the train and eval sets. Is it recommended/necessary for the train and eval sets to have the same number of partitions? I'm a little clear on what's done with each partition under the hood. My concern would be that if you don't have the same number of partitions (i.e. use fewer for the eval set because it's smaller), one or more of the workers might not get an eval set. When I originally had the eval set partitioned with fewer partitions than the train set (in ratio to how big they are), I got a rather difficult to parse error about empty dmatrices.

@trivialfis
Copy link
Member

trivialfis commented Sep 30, 2024

Excellent progress! No, it's not required to have the same number of partitions. Preferably both of them have partitions for all workers (no worker is being starved for either dataset).

XGBoost takes what's given, it doesn't move data or anything. Internally, it just iterates over partitions for each dataset independently. As long as partitions within each dataset are aligned (comes from the same dataframe,for instance), then it's fine.

@gdubs89
Copy link
Author

gdubs89 commented Oct 1, 2024

So I have managed to scale to 2 billion rows, but this does seem to be the point where it started to struggle. Dask started to complain about the size of the graph
image
What's interesting, is that the graph seems to be getting larger. The origin of this is probably that I'm doing some sequential training where I increase the learning rate (to avoid the situation where you end up training for 800 rounds before early-stopping, but the last 500 rounds are only giving very marginal improvements). The error in fact links to here . While this isn't what I'm doing, I wonder whether the fact that I do have a for loop of learning rates and within the loop I'm doing model = dxgb.train(..., xgb_model =model['booster']) is causing some highly nested dask graph to be built, which in turn is probably making the later training rounds less efficient.

Any ideas how to mitigate this? [edit: one idea I had to mitigate this was to write the model to disk and then load it back in, as that might break the graph. And it did, in the sense that the warning now said my graph was only 13Mb compared to 54mb previously (and it kicked in after more loops of the training procedure), but it doesn't seem to have solved the problem, as in I'm still getting a warning about large dask graphs]

Second edit: Interestingly, increasing the max_depth seems to significantly increase the size of the graph in the warning

@trivialfis
Copy link
Member

Any ideas how to mitigate this

My first guess is the booster object is too large, and dask complains about it. If that's the case, the warning is harmless, and there's no need to work around it. We have to transfer the booster somehow. Splitting it up can disable the warning but creates no benefit. If that's not the cause, please share your code. Or at least a gist of the code and the hyper-parameters you use.

@gdubs89
Copy link
Author

gdubs89 commented Oct 3, 2024

Code is unchanged from original post, other than that I've created a loop to increase the learning rate:

lr_rounds = {0.1: 100, 0.2:100, 0.4:100, 0.8:100_000}#when it gets to the final learning rate, want the number of boosting rounds to be functionally infinite and let early stopping determine how long we train for

for LR in sorted(list(lr_rounds.keys())):
   params['learning_rate'] = LR 

   if model is None:
       model = dxgb.train(
              client=client,
              params=params,
              dtrain=dtrain,
              num_boost_round=lr_rounds[LR],
              early_stopping_rounds=10,
              evals=[(dvalid, 'eval')]
              )
   else:
       model = dxgb.train(
              client=client,
              params=params,
              dtrain=dtrain,
              num_boost_round=lr_rounds[LR],
              early_stopping_rounds=10,
              evals=[(dvalid, 'eval')],
              xgb_model=model
              )

(I added a few more bells and whistles to make sure that if it early stops for one of the learning rates before 0.8, that the next boosting round starts from the optimal model rather than the final one, but I don't think that should affect anything)

Broadly speaking this won't train for more than 400 rounds for the data I have, and I'm exploring maxdepths from ~8-16. So 400 trees of depth 16 are not trivial in terms of memory consumption, but also still a fraction of the data volume being handled on each worker.

I'm happy to accept this as harmless if you don't think this is a problem. I haven't had any more problems with training failing or being erratic.

@trivialfis
Copy link
Member

Thank you for sharing! The code looks fine.

but also still a fraction of the data volume being handled on each worker

Dask doesn't usually send large objects across workers, which can hurt performance due to network constraints. But gathering a single booster for the client process should be fine.

Feel free to close the issue if you have no further questions. ;-)

@gdubs89 gdubs89 closed this as completed Oct 3, 2024
@wbo4958
Copy link
Contributor

wbo4958 commented Oct 8, 2024

I'm wondering if we can have a doc for running xgboost with spark/dask on different cloud environments? @trivialfis

@gdubs89
Copy link
Author

gdubs89 commented Oct 11, 2024

small update on this (not sure it's worth re-opening the issue over), I am struggling when the tree maxdepths are increased. The aforementioned warning about graph sizes is exacerbated by having a higher maxdepth, which might be related (potentially the trees themselves taking up more room in memory?)

The problem is, that I would anticipate that the way you're going to get more performance out of scaling training to bigger datasets, is precisely that optimal performance will happen at higher maxdepths, i.e. due to having more data, you can fit more complex models before you overfit. (I'm also seeing this empirically, that the deepest maxdepth I've manage to successfully train with has got the best performance on a test set).

I tried to reconfigure my cluster to have roughly the same amount of overall memory (an amount of memory which comfortable allowed me to train xgboost with a maxdepth of say 8 without being anywhere near the memory limit according to the dask dashboard (roughly around 50% utilisation across the board)) but with fewer, larger workers (in itself not cost ideal, this increased my DBU/hr in databricks by about 20%), but still ran into the error.

Furthermore, it happens deep into training, often after like 3 hours, so it's not really feasible from a time or cost perspective, to experiment with lots of little hyperparameter tweaks (e.g. introduce some gamma-reg so that not all trees bottom out, perhaps up the learning rate a little so we have fewer trees, start training with high depths and then bring down the maxdepth and up the learning rate for later training rounds, etc).

And I've also not been able to get verbose training output to work in dask-databricks (annoyingly it works in vanilla dask, but I found vanilla dask on databricks to be extremely unreliable), but verbose output would help a little bit in terms of being able to understand quickly whether a set of hyperparameters was going to give acceptable performance from an ML metrics perspective and is thus worth pursuing.

@wbo4958
Copy link
Contributor

wbo4958 commented Oct 11, 2024

XGBoost pyspark in Databricks is quite stable, maybe you can try xgboost pyspark. and it's quite fast if using GPUs.

@trivialfis trivialfis reopened this Oct 13, 2024
@trivialfis
Copy link
Member

trivialfis commented Oct 13, 2024

For the graph size warning, I suggest you to look for issues like whether the partition size is being too small. It will generate a larger operation graph and slower performance. On the other hand, if it's caused by XGBoost booster model, which happens only at the end of the training, then please ignore it.

Thus far, the only critical thing for XGBoost to achieve good distributed training performance is data balancing. It can be mad slow if data size is skewed. The latency caused by waiting accumulates instead of simply bottlenecked by the slowest worker.

The training function and the estimators accept a callback function. You can define your own logger using callbacks. Please find an example in the demo/guide-python directory. With a callback, you have full control over logging. Maybe writing the results to a remote file server?

@gdubs89
Copy link
Author

gdubs89 commented Oct 14, 2024

@wbo4958 : I believe you're talking about the SparkXGBClassifier ? If I'm not mistaken, it neither accepts spark's sparse feature representation as inputs, nor does it natively support categorical variables, and consequently I don't think it's really suitable for doing large-scale ML with many high-cardinality categoricals.

@trivialfis : I can take another look at the partitioning strategy, but the issue here is that it trains just fine on the data I've prepared, on the cluster I set up (never getting north of ~50% memory utilisation on any worker) until I start pushing up the maxdepths. So this suggests something to do with tree size, but I also find it rather dubious that the issue would be that the ensemble of trees itself is taken up too much memory, because I'm being relatively aggressive about upping the learning rate, so we're never getting to more than say 500 trees, and 500 trees of maxdepth 14...back of the envelope says that's gonna be small fraction of the size of the full training data (not sure whether the full tree is being copied to all workers but even then, I'd be surprised if it would be taking up more than single-digit % of each worker's RAM)

Thanks for the tip on logging, I'll have a look.

@trivialfis
Copy link
Member

trivialfis commented Oct 16, 2024

So this suggests something to do with tree size

It makes sense. Dask prefers small data transfer, which is unrelated to the total size of the data. It raises a warning if it needs to send large objects or graphs across the network.

issue would be that the ensemble of trees itself is taken up too much memory

It's not about taking up too much memory; You can push the memory usage to its limit without seeing dask warning. It's dask considers sending objects of large sizes across the network or using complex graphs inefficient and warns users to look for potential causes and optimizations. It's a performance warning.

Having said that, I will ping @rjzamora for better insight. I'm not familiar with the dynamics inside dask.

@rjzamora
Copy link
Contributor

rjzamora commented Oct 16, 2024

Having said that, I will ping @rjzamora for better insight. I'm not familiar with the dynamics inside dask.

I'm not sure if this is helpful at all, but the UserWarning: Sending large graph ... you get from dask is entirely related to the size of the graph when it is pickled on the client process and sent over the wire to the scheduler. Although the size of the graph is proportional to the number of tasks (which is proportional to the number of partitions), this warning is rarely cause by the number of tasks being huge (although that can certainly happen).

In practice, the large-graph warning usually means that you are constructing a graph on the client that contains data that it probably shouldn't contain. For example, if each of your tasks will be operating on a distinct partition of Array data, you wouldn't want to pass that data to the workers through the graph. Rather, you would want your tasks to include the necessary logic to read that data from disk when it executes on the worker.

In the case of XGB, it does seem possible that dask is just complaining about the size of the booster when you send it to the cluster. @trivialfis is correct that the model needs to get to the workers somehow. As long as you aren't explicitly passing a copy of the booster in every training task within the graph, then there is probably not a "better" option.

@gdubs89
Copy link
Author

gdubs89 commented Oct 16, 2024

Not sure what is meant by "explicitly passing a copy of the booster in every training task within the graph" ?
What could cause this to happen?

The only perhaps non-standard thing I'm doing is dynamically updating the learning rate using code very similar to above. Is it possible that this is causing dask to copy not just the latest version of the booster (containing all the trees) but also previous versions?

When you say dask might be complaining about the size of the booster (I believe it is btw, because I can see that if I up the max depth, the graph size in the warning message increases), is there some reason why a large graph might cause problems long before I physically run out of memory? I was using a cluster of r5d.8xlarge, i.e. 256 gb of memory on each, and early in training they were at circa 50% memory. Back of the envelope, there's no way the booster itself is gonna start taking up on the order of 100gb, and also the number in the warning message is more of the order of 5gb.

@rjzamora
Copy link
Contributor

Not sure what is meant by "explicitly passing a copy of the booster in every training task within the graph" ?
What could cause this to happen?

I don't think you are doing this. My intention was more-so to explain "who" this warning is intended for. My general impression that the large-graph warning itself shouldn't be a concern to you. With that said, I am concerned if the size of the graph grows with every iteration and you plan on doing this for many iterations.

Is it possible that this is causing dask to copy not just the latest version of the booster (containing all the trees) but also previous versions?

I honestly don't know off hand - Are you planning to run many iterations?

When you say dask might be complaining about the size of the booster (I believe it is btw, because I can see that if I up the max depth, the graph size in the warning message increases), is there some reason why a large graph might cause problems long before I physically run out of memory?

I'm definitely not 100% sure about this, but my impression is that the graph is large-enough to meet Dask's simple heuristic to warn the user, but not actually large enough to cause problems. A large graph can (temporarily) slow down the scheduler if there is a huge number of tasks or it is overwhelmed with serialization/deserialization.

@trivialfis
Copy link
Member

is there some reason why a large graph might cause problems long before I physically run out of memory

My understanding is that dask simply considers the graph size abnormal, which is unrelated to the total amount of memory available in the system.

@gdubs89
Copy link
Author

gdubs89 commented Oct 17, 2024

Is it possible that this is causing dask to copy not just the latest version of the booster (containing all the trees) but also previous versions?

I honestly don't know off hand - Are you planning to run many iterations?

By iterations, do you mean separate training runs with new learning rates? Is so, not really. I think I ran 100 boosting rounds at a learning rate of 0.1, another 100 at a learning rate of 0.2, another 100 at a learning rate of 0.4, and then finally set the learning rate to 0.85 and let it train until it early stops.

My best estimate is that the whole ensemble would come in at south of 1000 trees, and as the maxdepth increases, the optimal stopping point will come forward and we'll end up with fewer trees, but presumably overall more terminal nodes/an booster that consumes more memory.

I could of course just not do this, but in practice I've found that you need a decently low learning rate to get good performance, but low learning rates lead to extremely slow convergence and you need to up the learning rate after a while if you don't want it to train for thousands of rounds.

If we think the iterative training is likely to be the issue, I suppose one mitigation would be to just train for 200 boosting rounds with a learning rate of 0.1, and then set the learning rate to maybe 0.6 and train until we early stop. This would likely end up with more trees in the ensemble overall, and perhaps marginally worse generalisation but nothing too catastrophic. If even if is likely to cause problems and I need to train in one shot until we early stop, I think it could get quite tricky to choose a learning rate that would generalise well but would not take unacceptably long to train.

I guess an option of desperation (the thing I always tell people not to do) would be to just estimate what a good number of boosting rounds is, and do away with early stopping (could then repurpose the eval set for more training data)

@trivialfis
Copy link
Member

trivialfis commented Oct 17, 2024

The booster is transferred once per training session. If you run train(client, {"learning_rate": 0.1}, ..., num_boost_rounds=200), the booster is transferred at the end of the train function. Meaning, there's only one transfer in this function. If you try 3 different learning rates with three different calls to train, then there are three transfers of the booster object.

My suggestion is still to just ignore the graph size warning. There's no effective solution to it yet and it doesn't affect your model training.

@gdubs89
Copy link
Author

gdubs89 commented Oct 17, 2024

Yeah sorry my bad, I looked back through the messages and I didn't make this clear when I said "I'm struggling". The issue is not just the graph warning, training is actually failing when I increase the maxdepth. It's also taking a long time before it fails (<1 hour) to quite difficult to iterate. So with the same train/eval set, on a given cluster configuration, I'm able to train with maxdepth<=10, but it fails for maxdepth=12.

Annoyingly I didn't make a note of the traceback at the time, but I convinced myself it seemed memory related. I can burn some more compute hours and reproduce if useful.

@trivialfis
Copy link
Member

trivialfis commented Oct 17, 2024

max_depth=12 sounds fine, the memory usage of really deep models is dominated by the histogram during training. For a dataset with 50 features (yours), and with max_bin=256 (default), the gradient histogram is 16 * 50 * 256 * (2^13 - 1) bytes (f64 grad + f64 hess = 16 bytes), which is roughly 1.56 GB, not a lot. I don't know what's causing the failure. It might be true that the cluster is already on the brink of OOM error and the extra depth is the last straw. But some more investigation can be helpful.

@gdubs89
Copy link
Author

gdubs89 commented Oct 17, 2024

seeing as it took a few hours before it failed, I wasn't watching the dask dashboard at the time of failure, but ~20 minutes into training, all workers were at circa 50% memory usage, which is why I was saying previously that it seemed incredibly unlikely that a large learner size was pushing it over the edge.

I'll re-run and post a detailed error message

@gdubs89
Copy link
Author

gdubs89 commented Oct 18, 2024

hmmm OK, upon re-running it, I've been keeping an eye on the dask dashboard, checking in every 20 minutes or so.

For the first ~2 hours (which roughly was the first two sets of 100 boosting rounds at LR=0.1 and then LR=0.2), memory utilisation across the cluster ranged from 30-60%, but it was very stable (as in there, was one machine with 60% utilisation but I checked every 10-20 minutes for 2 hours and the number barely moved)

But then during the 3rd set of 100 boosting rounds at LR=0.4, I get an OOM error. Now I wasn't watching the dask dashboard constantly, so I can't confirm that the worker with the highest memory load never exceeded 60%, but it seems improbable or at least unclear why this would have happened, given it was so stable, hovering around 60% for a few hours/many training iterations.

Here's the traceback
image
image
image

(bottom few lines are the output I print to screen every time we start/finish a training run)

Edit: This is technically from a different run, I decreased the MaxDepth by 1, but I'm pretty sure the same happened last time, i.e. currently it hasn't technically failed yet, but the dask dashboard is showing that nothing is happening, and there's been a tonne of spill
image
but I believe it will eventually fail with an OOM error, but that takes a long time to happen, it happens long after the dask dashboard is able to ascertain that something has gone wrong

@trivialfis
Copy link
Member

Hmm, thank you for sharing. I see a severe imbalance there, with a few workers having significantly less data than others.

@gdubs89
Copy link
Author

gdubs89 commented Oct 18, 2024

Yeah, let me try and get a screengrab of what is looks like early on in training when things are healthy

@trivialfis
Copy link
Member

We will try to work on automated data balancing. Without it, it isn't easy to control the memory usage.

@gdubs89
Copy link
Author

gdubs89 commented Oct 18, 2024

image
so this is like 2 hours into training with a lower max depth, so it does seem to be being caused/exacerbated by upping the maxdepth 🤔

@trivialfis
Copy link
Member

trivialfis commented Oct 18, 2024

@gdubs89 Could you please share the following information:

  • all hyper-parameters
  • data shape, including number of samples, feature, sparsity/density
  • input data type (float32/64, categorical).
  • If there are categorical features, please share the cardinality. Are you performing some encoding?

Please share the version of the above information closest to the version that causes issues. I will try to reproduce and profile next week.

@gdubs89
Copy link
Author

gdubs89 commented Oct 21, 2024

Sure:
I'm using a cluster of 16 r5d.8xlarge machines (+ same worker) so ~4TB memory are 512 cores

My training params are set by:

  params = {
        "objective": "binary:logistic",
        "max_depth": max_depth, #this is where the problem arises, whether I set this to 8 or 14
        'monotone_constraints': {'continuous_feature_x': 1, 'continuous_feature_y': 1},
        'eval_metric':'logloss',
        'tree_method':'hist'
    }#so I'm mostly using default hyperparameter values

As per above, I start with a low learning rate and then turn it up, my learning rate to boosting rounds dictionary is {0.1: 100, 0.2: 100, 0.4: 100, 0.8: 20_000}

When it comes to the data:

  • Training set is 2.15 x 1e9 rows, eval set is 560 million rows
  • There are 48 features, 1 binary target
  • Of the 48 features, we have 30 integers and 18 floats
  • The categorical features have already been mapped to integer using a hashmod function. In fact all 30 of those integer features represent categorical variables (for how I treat these categoricals, see code in OP)
  • As regards the datatypes, when just doing a spark.read.parquet(), and then getting the datatypes, they are int and float. Not entirely sure what happens under the hood in dask when I read them in. And just to reiterate, I'm not explicitly using a sparse encoding for the categoricals, but I think this is what happens when I cast columns as categorical types in dask and then convert to dmatrix with enable_categorical=True ?
  • I used 5000 as my mod when hasmodding the categoricals, so some of the categoricals have a cardinality of 5000, but some of them don't, here's a cdf of the cardinality of the integer columns
    image

And I'm running databricks runtime 15.4 LTS, dask_databricks==0.3.2 and xgboost==2.1.1

@gdubs89
Copy link
Author

gdubs89 commented Oct 23, 2024

An extra datapoint to add to this, I did manage to train a tree with maxdepth=14, when I just did it in one training call with early stopping, rather than sequentially training and turning up the learning rate. Unfortunately, even with a relatively high learning rate of 0.3, this took ~8 hours until it early-stopped (as opposed to more like 3-4 hours at maxdepth=12 with iteratively increasing learning rates).

So while this training in steps is causing some issues, I would regard this as more than just a nice to have (especially if I wanted to increase the training data size/cluster by an order of magnitude)

@trivialfis
Copy link
Member

Thank you for the detailed information. I'm working on the dask interface now. This PR should help with the issue of retrieving evaluation logs from databricks: #10942 .

Will look into memory usage.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants