Skip to content

Commit

Permalink
Improve clusters and several bug fixes (#1141)
Browse files Browse the repository at this point in the history
https://huggingface.co/spaces/lilacai/daniel_staging

Clustering (backend):
- Lower `min_cluster_size` to 5 for the categories, so we can have more
coherent categories
- Add timeout of 7 sec (99%-tile response latency for OpenAI is like 3-4
sec) to avoid the 10min timeout. We can now title clusters of 1M docs
(11k clusters) in 8mins.
- Disable internal OpenAI retries (we used to have double retries)
- Replace "request" with "snippet" in the prompt to avoid biasing
towards user's requests -- improves forum/email/text clustering

UI
- make the histograms reactive to the currently selected group in "group
by"
- make pivot reactive to searches (e.g. keyword search, metadata search)
- remember the schema and nav bar state when flipping between cluster
view and item view
- Fix a bug with search box state, after page refresh
  • Loading branch information
dsmilkov authored Jan 29, 2024
1 parent c7e7e28 commit 4219c42
Show file tree
Hide file tree
Showing 16 changed files with 198 additions and 105 deletions.
3 changes: 0 additions & 3 deletions lilac/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from .env import * # noqa: F403
from .env import LilacEnvironment, get_project_dir, set_project_dir
from .formats import * # noqa: F403
from .formats import OpenChat, ShareGPT
from .load import load
from .load_dataset import create_dataset, from_dicts, from_huggingface
from .project import init
Expand Down Expand Up @@ -73,8 +72,6 @@
'deploy_project',
'deploy_config',
'SpanVector',
'ShareGPT',
'OpenChat',
'download',
'upload',
'register_embedding',
Expand Down
141 changes: 89 additions & 52 deletions lilac/data/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,13 @@

_SHORTEN_LEN = 400
_TOP_K_CENTRAL_DOCS = 7
_TOP_K_CENTRAL_TITLES = 15
_TOP_K_CENTRAL_TITLES = 20
_NUM_THREADS = 32
_NUM_RETRIES = 16
# OpenAI rate limits you on `max_tokens` so we ideally want to guess the right value. If ChatGPT
# fails to generate a title within the `max_tokens` limit, we will retry with a higher value.
_INITIAL_MAX_TOKENS = 50
_FINAL_MAX_TOKENS = 200

CLUSTER_ID = 'cluster_id'
CLUSTER_MEMBERSHIP_PROB = 'cluster_membership_prob'
Expand All @@ -57,7 +62,8 @@

FIELD_SUFFIX = 'cluster'

MIN_CLUSTER_SIZE = 5
MIN_CLUSTER_SIZE = 10
MIN_CLUSTER_SIZE_CATEGORY = 5
UMAP_DIM = 5
UMAP_SEED = 42
HDBSCAN_SELECTION_EPS = 0.05
Expand All @@ -76,7 +82,10 @@ def _openai_client() -> Any:
'Please install it with `pip install openai`.'
)

return instructor.patch(openai.OpenAI())
# OpenAI requests sometimes hang, without any errors, and the default connection timeout is 10
# mins, which is too long. Set it to 7 seconds (99%-tile for latency is 3-4 sec). Also set
# `max_retries` to 0 to disable internal retries so we handle retries ourselves.
return instructor.patch(openai.OpenAI(timeout=7, max_retries=0))


def _snippet_to_prefix_and_suffix(text: str) -> str:
Expand All @@ -88,7 +97,7 @@ def _snippet_to_prefix_and_suffix(text: str) -> str:


class Title(BaseModel):
"""A 4-5 word title for the group of related requests."""
"""A 4-5 word title for the group of related snippets."""

title: str

Expand All @@ -97,7 +106,7 @@ def summarize_request(ranked_docs: list[tuple[str, float]]) -> str:
"""Summarize a group of requests in a title of at most 5 words."""
# Get the top 5 documents.
docs = [doc for doc, _ in ranked_docs[:_TOP_K_CENTRAL_DOCS]]
texts = [f'BEGIN_REQUEST\n{_snippet_to_prefix_and_suffix(doc)}\nEND_REQUEST' for doc in docs]
texts = [f'BEGIN_SNIPPET\n{_snippet_to_prefix_and_suffix(doc)}\nEND_SNIPPET' for doc in docs]
input = '\n'.join(texts)
try:
import openai
Expand All @@ -109,13 +118,21 @@ def summarize_request(ranked_docs: list[tuple[str, float]]) -> str:
)

@retry(
retry=retry_if_exception_type((openai.RateLimitError, openai.APITimeoutError)),
retry=retry_if_exception_type(
(
openai.RateLimitError,
openai.APITimeoutError,
openai.APIConnectionError,
openai.ConflictError,
openai.InternalServerError,
)
),
wait=wait_random_exponential(multiplier=0.5, max=60),
stop=stop_after_attempt(10),
stop=stop_after_attempt(_NUM_RETRIES),
)
def request_with_retries() -> str:
max_tokens = 50
while max_tokens <= 200:
max_tokens = _INITIAL_MAX_TOKENS
while max_tokens <= _FINAL_MAX_TOKENS:
try:
title = _openai_client().chat.completions.create(
model='gpt-3.5-turbo-1106',
Expand All @@ -126,22 +143,25 @@ def request_with_retries() -> str:
{
'role': 'system',
'content': (
'You are a world-class title generator. Ignore the group of related requests '
'below, and generate a short title to describe the common theme. Some examples: '
'You are a world-class short title generator. Ignore the related snippets below '
'and generate a short title to describe their common theme. Some examples: '
'"YA book reviews", "Questions about South East Asia", "Translating English to '
'Polish", "Writing product descriptions", etc. Prefer using descriptive words. Do '
'not use vague words like "various", "assortment", "comments", "discussion", etc.'
'Polish", "Writing product descriptions", etc. Use descriptive words. If the '
"snippet's language is different than English, mention it in the title, e.g. "
'"Cooking questions in Spanish". Avoid vague words like "various", "assortment", '
'"comments", "discussion", etc.'
),
},
{'role': 'user', 'content': input},
],
)
return title.title
except IncompleteOutputException:
max_tokens = max_tokens * 2
max_tokens += _INITIAL_MAX_TOKENS
log(f'Retrying with max_tokens={max_tokens}')
log(f'Could not generate a short title for input:\n{input}')
return 'FAILED_TO_GENERATE'
# We return a string instead of None, since None is emitted when the text column is sparse.
return 'FAILED_TO_TITLE'

return request_with_retries()

Expand All @@ -167,29 +187,45 @@ def generate_category(ranked_docs: list[tuple[str, float]]) -> str:
)

@retry(
retry=retry_if_exception_type((openai.RateLimitError, openai.APITimeoutError)),
retry=retry_if_exception_type(
(
openai.RateLimitError,
openai.APITimeoutError,
openai.APIConnectionError,
openai.ConflictError,
openai.InternalServerError,
)
),
wait=wait_random_exponential(multiplier=0.5, max=60),
stop=stop_after_attempt(10),
stop=stop_after_attempt(_NUM_RETRIES),
)
def request_with_retries() -> str:
category = _openai_client().chat.completions.create(
model='gpt-3.5-turbo-1106',
response_model=Category,
temperature=0.0,
max_tokens=50,
messages=[
{
'role': 'system',
'content': (
'You are a world-class category labeler. Generate a short category name for the '
'provided titles. For example, given two titles "translating english to polish" and '
'"translating korean to english", generate "Translation".'
),
},
{'role': 'user', 'content': input},
],
)
return category.category
max_tokens = _INITIAL_MAX_TOKENS
while max_tokens <= _FINAL_MAX_TOKENS:
try:
category = _openai_client().chat.completions.create(
model='gpt-3.5-turbo-1106',
response_model=Category,
temperature=0.0,
max_tokens=max_tokens,
messages=[
{
'role': 'system',
'content': (
'You are a world-class category labeler. Generate a short category name for the '
'provided titles. For example, given two titles "translating english to polish" '
'and "translating korean to english", generate "Translation".'
),
},
{'role': 'user', 'content': input},
],
)
return category.category
except IncompleteOutputException:
max_tokens += _INITIAL_MAX_TOKENS
log(f'Retrying with max_tokens={max_tokens}')
log(f'Could not generate a short category for input:\n{input}')
return 'FAILED_TO_GENERATE'

return request_with_retries()

Expand Down Expand Up @@ -257,7 +293,7 @@ def cluster_impl(
dataset: Dataset,
input_fn_or_path: Union[Path, Callable[[Item], str], DatasetFormatInputSelector],
output_path: Optional[Path] = None,
min_cluster_size: int = 5,
min_cluster_size: int = MIN_CLUSTER_SIZE,
topic_fn: TopicFn = summarize_request,
overwrite: bool = False,
use_garden: bool = False,
Expand Down Expand Up @@ -347,11 +383,11 @@ def extract_text(item: Item) -> Item:
cluster_ids_exists = schema.has_field((*cluster_output_path, CLUSTER_ID))
if not cluster_ids_exists or overwrite:
if task_info:
task_info.message = 'Computing clusters'
task_info.message = 'Clustering documents'
task_info.total_progress = 0
task_info.total_len = None

def compute_clusters(items: Iterator[Item]) -> Iterator[Item]:
def cluster_documents(items: Iterator[Item]) -> Iterator[Item]:
items, items2 = itertools.tee(items)
docs: Iterator[Optional[str]] = (item.get(TEXT_COLUMN) for item in items)
cluster_items = sparse_to_dense_compute(
Expand All @@ -365,7 +401,7 @@ def compute_clusters(items: Iterator[Item]) -> Iterator[Item]:

# Compute the clusters.
dataset.transform(
compute_clusters,
cluster_documents,
input_path=cluster_output_path,
output_path=cluster_output_path,
overwrite=True,
Expand All @@ -374,11 +410,11 @@ def compute_clusters(items: Iterator[Item]) -> Iterator[Item]:
cluster_titles_exist = schema.has_field((*cluster_output_path, CLUSTER_TITLE))
if not cluster_titles_exist or overwrite or recompute_titles:
if task_info:
task_info.message = 'Computing cluster titles'
task_info.message = 'Titling clusters'
task_info.total_progress = 0
task_info.total_len = total_len

def compute_cluster_titles(items: Iterator[Item]) -> Iterator[Item]:
def title_clusters(items: Iterator[Item]) -> Iterator[Item]:
items, items2 = itertools.tee(items)
titles = _compute_titles(
items,
Expand All @@ -392,7 +428,7 @@ def compute_cluster_titles(items: Iterator[Item]) -> Iterator[Item]:
yield {**item, CLUSTER_TITLE: title}

dataset.transform(
compute_cluster_titles,
title_clusters,
input_path=cluster_output_path,
output_path=cluster_output_path,
sort_by=(*cluster_output_path, CLUSTER_ID),
Expand All @@ -402,15 +438,15 @@ def compute_cluster_titles(items: Iterator[Item]) -> Iterator[Item]:
category_id_exists = schema.has_field((*cluster_output_path, CATEGORY_ID))
if not category_id_exists or overwrite or recompute_titles:
if task_info:
task_info.message = 'Computing super clusters'
task_info.message = 'Clustering titles'
task_info.total_progress = 0
task_info.total_len = None

def compute_category_clusters(items: Iterator[Item]) -> Iterator[Item]:
def cluster_titles(items: Iterator[Item]) -> Iterator[Item]:
items, items2 = itertools.tee(items)
docs = (item.get(CLUSTER_TITLE) for item in items)
cluster_items = sparse_to_dense_compute(
docs, lambda x: _hdbscan_cluster(x, min_cluster_size, use_garden)
docs, lambda x: _hdbscan_cluster(x, MIN_CLUSTER_SIZE_CATEGORY, use_garden)
)
for item, cluster_item in zip(items2, cluster_items):
item[CATEGORY_ID] = (cluster_item or {}).get(CLUSTER_ID, -1)
Expand All @@ -419,7 +455,7 @@ def compute_category_clusters(items: Iterator[Item]) -> Iterator[Item]:

# Compute the clusters.
dataset.transform(
compute_category_clusters,
cluster_titles,
input_path=cluster_output_path,
output_path=cluster_output_path,
overwrite=True,
Expand All @@ -429,11 +465,11 @@ def compute_category_clusters(items: Iterator[Item]) -> Iterator[Item]:
category_title_exists = schema.has_field(category_title_path)
if not category_title_exists or overwrite or recompute_titles:
if task_info:
task_info.message = 'Computing category titles'
task_info.message = 'Titling categories'
task_info.total_progress = 0
task_info.total_len = total_len

def compute_category_titles(items: Iterator[Item]) -> Iterator[Item]:
def title_categories(items: Iterator[Item]) -> Iterator[Item]:
items, items2 = itertools.tee(items)
titles = _compute_titles(
items,
Expand All @@ -445,11 +481,12 @@ def compute_category_titles(items: Iterator[Item]) -> Iterator[Item]:
)
for item, title in zip(items2, titles):
# Drop the temporary newline-concatenated text column.
del item[TEXT_COLUMN]
if TEXT_COLUMN in item:
del item[TEXT_COLUMN]
yield {**item, CATEGORY_TITLE: title}

dataset.transform(
compute_category_titles,
title_categories,
input_path=cluster_output_path,
output_path=cluster_output_path,
sort_by=(*cluster_output_path, CATEGORY_ID),
Expand Down Expand Up @@ -494,7 +531,7 @@ def drop_temp_text_column(items: Iterator[Item]) -> Iterator[Item]:

def _hdbscan_cluster(
docs: Iterator[str],
min_cluster_size: int = MIN_CLUSTER_SIZE,
min_cluster_size: int,
use_garden: bool = False,
num_docs: Optional[int] = None,
task_info: Optional[TaskInfo] = None,
Expand All @@ -504,7 +541,7 @@ def _hdbscan_cluster(
remote_fn = modal.Function.lookup('cluster', 'Cluster.cluster').remote
with DebugTimer('Compressing docs for clustering remotely'):
gzipped_docs = compress_docs(list(docs))
response = remote_fn({'gzipped_docs': gzipped_docs})
response = remote_fn({'gzipped_docs': gzipped_docs, 'min_cluster_size': min_cluster_size})
yield from response['clusters']

if task_info:
Expand Down
7 changes: 7 additions & 0 deletions lilac/data/clustering_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def topic_fn(docs: list[tuple[str, float]]) -> str:
return 'simplification'
return 'other'

mocker.patch.object(clustering, 'MIN_CLUSTER_SIZE_CATEGORY', 2)
mocker.patch.object(clustering, 'generate_category', return_value='MockCategory')
_mock_jina(mocker)

Expand Down Expand Up @@ -236,6 +237,7 @@ def test_nested_clusters(make_test_data: TestDataMaker, mocker: MockerFixture) -
{'text': 'Give me simplified version of this text'},
],
]
mocker.patch.object(clustering, 'MIN_CLUSTER_SIZE_CATEGORY', 2)
mocker.patch.object(clustering, 'generate_category', return_value='MockCategory')
dataset = make_test_data([{'texts': t} for t in texts])

Expand Down Expand Up @@ -307,6 +309,7 @@ def topic_fn(docs: list[tuple[str, float]]) -> str:
return 'b_cluster'
return 'other'

mocker.patch.object(clustering, 'MIN_CLUSTER_SIZE_CATEGORY', 2)
_mock_jina(mocker)
dataset.cluster('texts.*', min_cluster_size=2, topic_fn=topic_fn)
rows = list(dataset.select_rows(combine_columns=True))
Expand Down Expand Up @@ -356,6 +359,7 @@ def test_clusters_with_fn(make_test_data: TestDataMaker, mocker: MockerFixture)
]
dataset = make_test_data([{'texts': t} for t in texts])
mocker.patch.object(clustering, 'generate_category', return_value='MockCategory')
mocker.patch.object(clustering, 'MIN_CLUSTER_SIZE_CATEGORY', 2)

def topic_fn(docs: list[tuple[str, float]]) -> str:
if 'summar' in docs[0][0]:
Expand Down Expand Up @@ -440,6 +444,7 @@ def test_clusters_with_fn_output_is_under_a_dict(
]
mocker.patch.object(clustering, 'generate_category', return_value='MockCategory')
dataset = make_test_data([{'texts': t, 'info': {'dummy': True}} for t in texts])
mocker.patch.object(clustering, 'MIN_CLUSTER_SIZE_CATEGORY', 2)

def topic_fn(docs: list[tuple[str, float]]) -> str:
if 'summar' in docs[0][0]:
Expand Down Expand Up @@ -557,6 +562,7 @@ def topic_fn(docs: list[tuple[str, float]]) -> str:
return 'time'
return 'other'

mocker.patch.object(clustering, 'MIN_CLUSTER_SIZE_CATEGORY', 2)
_mock_jina(mocker)
dataset.cluster(
ShareGPT.human,
Expand Down Expand Up @@ -655,6 +661,7 @@ def topic_fn(docs: list[tuple[str, float]]) -> str:

signal = TestSignal()
dataset.compute_signal(signal, 'text')
mocker.patch.object(clustering, 'MIN_CLUSTER_SIZE_CATEGORY', 2)
_mock_jina(mocker)

dataset.cluster('text', min_cluster_size=2, topic_fn=topic_fn)
Expand Down
Loading

0 comments on commit 4219c42

Please sign in to comment.