Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix OOM issue, prevent caching and miner sync bugfix #99

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 13 additions & 15 deletions logicnet/base/miner.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def run(self):

# Check that miner is registered on the network.
self.sync()
last_sync_block = self.block

# Serve passes the axon information to the network + netuid we are hosting on.
# This will auto-update if the axon port of external ip have changed.
Expand All @@ -69,28 +70,25 @@ def run(self):
)
self.axon.serve(netuid=self.config.netuid, subtensor=self.subtensor)

# Start starts the miner's axon, making it active on the network.
# Start starts the miner's axon, making it active on the network.
self.axon.start()

bt.logging.info(f"\033[1;32m🧠 Miner starting at block: {self.block}\033[0m")

# This loop maintains the miner's operations until intentionally stopped.
try:
RESYNC_INTERVAL = self.config.neuron.epoch_length # resync every self.config.neuron.epoch_length blocks
SLEEP_TIME = 30 # sleep time between checks (seconds)

while not self.should_exit:
while (
self.block - self.metagraph.last_update[self.uid]
< self.config.neuron.epoch_length
):
# Wait before checking again.
time.sleep(1)

# Check if we should exit.
if self.should_exit:
break

# Sync metagraph and potentially set weights.
self.sync()
self.step += 1
try:
if self.block - last_sync_block > RESYNC_INTERVAL:
self.sync()
self.step += 1
last_sync_block = self.block
time.sleep(SLEEP_TIME)
except Exception as e:
bt.logging.error(f"\033[1;31m❌ Miner exception: {e}\033[0m")

# If someone intentionally stops the miner, it'll safely terminate operations.
except KeyboardInterrupt:
Expand Down
11 changes: 9 additions & 2 deletions logicnet/utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ def add_args(cls, parser):
parser.add_argument(
"--async_batch_size",
type=int,
help="The number of threads to run in a single loop.",
default=16,
help="Validator query batch size.",
default=32,
)

parser.add_argument(
Expand Down Expand Up @@ -205,6 +205,13 @@ def add_args(cls, parser):
default="60,20,20",
)

parser.add_argument(
"--penalty_threshold",
type=float,
help="The similarity threshold for penalty",
default=0.95,
)

else:
parser.add_argument(
"--miner.category",
Expand Down
151 changes: 105 additions & 46 deletions logicnet/validator/rewarder.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
import torch
import openai
import sympy
import random
import bittensor as bt
from concurrent import futures
from logicnet.protocol import LogicSynapse

import bittensor as bt
import openai
import sympy
import torch
from rapidfuzz import fuzz
from sentence_transformers import SentenceTransformer

from logicnet.protocol import LogicSynapse
from logicnet.utils.model_selector import model_selector
from logicnet.utils.regex_helper import extract_numbers
from logicnet.validator.prompt import DETECT_TRICK_TEMPLATE, CORRECTNESS_TEMPLATE, EXTRACT_ANSWER_PROMPT
from logicnet.validator.prompt import (CORRECTNESS_TEMPLATE,
DETECT_TRICK_TEMPLATE,
EXTRACT_ANSWER_PROMPT)

SIMILARITY_WEIGHT = 0.3
CORRECTNESS_WEIGHT = 0.7
Expand All @@ -17,20 +22,23 @@


class LogicRewarder:
def __init__(self, model_rotation_pool: dict):
def __init__(self, model_rotation_pool: dict, penalty_threshold: float):
"""
READ HERE TO LEARN HOW VALIDATOR REWARD THE MINER
"""
self.model_rotation_pool = model_rotation_pool
self.embedder = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
self.penalty_threshold = penalty_threshold

def __call__(self, uids, responses: list[LogicSynapse], base_synapse: LogicSynapse):
"""Calculate reward for each response using similarity, correctness, and processing time.
def __call__(self, reward_uids, reward_responses: list[LogicSynapse], non_reward_responses: list[LogicSynapse], base_synapse: LogicSynapse):
"""
Calculate reward for each response using similarity, correctness, and processing time.
This method also apply a penalty to reponses that have high similarity to other responses in the same batch, which imply they are copying each other.

Args:
task_uid (int): Unique task UID.
uids (list[int]): List of miner UIDs.
responses (list[LogicSynapse]): Synapse responses from miners.
reward_uids (list[int]): List of miner UIDs to be rewarded.
reward_responses (list[LogicSynapse]): Synapse responses from miners to be rewarded.
non_reward_responses (list[LogicSynapse]): Synapse responses from miners to be penalized.
base_synapse (LogicSynapse): Base synapse containing the ground truth and raw logic question.

Returns:
Expand All @@ -39,76 +47,86 @@ def __call__(self, uids, responses: list[LogicSynapse], base_synapse: LogicSynap
# Get the unique task UID from the base_synapse
task_uid = base_synapse.task_uid

valid_uids = [
uid for uid, response in zip(uids, responses) if response.is_success
valid_reward_uids = [
uid for uid, response in zip(reward_uids, reward_responses) if response.is_success
]
valid_responses = [response for response in responses if response.is_success]
invalid_uids = [
uid for uid, response in zip(uids, responses) if not response.is_success
valid_reward_responses = [response for response in reward_responses if response.is_success]
valid_non_reward_responses = [response for response in non_reward_responses if response.is_success]
invalid_reward_uids = [
uid for uid, response in zip(reward_uids, reward_responses) if not response.is_success
]
bt.logging.info(f"Valid UIDs: {valid_uids}")
bt.logging.info(f"Invalid UIDs: {invalid_uids}")
invalid_rewards = [0 for _ in invalid_uids]
bt.logging.info(f"Valid reward UIDs: {valid_reward_uids}")
bt.logging.info(f"Invalid reward UIDs: {invalid_reward_uids}")
invalid_rewards = [0 for _ in invalid_reward_uids]
reward_logs = []
valid_rewards = []

if valid_uids:
if valid_reward_uids:
ref_ground_truth: str = self._get_ground_truth(
base_synapse.raw_logic_question
)
response_texts = [response.logic_reasoning for response in valid_responses]
similarities = self._get_similarity(ref_ground_truth, response_texts)
correctness = self._get_correctness(base_synapse, valid_responses)
reward_response_texts = [response.logic_reasoning for response in valid_reward_responses]
non_reward_response_texts = [response.logic_reasoning for response in valid_non_reward_responses]

similarities = self._get_similarity(ref_ground_truth, reward_response_texts)
correctness = self._get_correctness(base_synapse, valid_reward_responses)
penalties = self._get_penalties(reward_response_texts, non_reward_response_texts)
process_times = [
response.dendrite.process_time for response in valid_responses
response.dendrite.process_time for response in valid_reward_responses
]
timeout = base_synapse.timeout

for i in range(len(valid_responses)):
for i in range(len(valid_reward_responses)):
reward = (
SIMILARITY_WEIGHT * similarities[i]
+ CORRECTNESS_WEIGHT * correctness[i]
+ PROCESSING_TIME_WEIGHT * min(process_times[i] / timeout, 1)
)


# Scale up the reward
reward = reward / 2 + 0.5

# Apply penalty to the reward
if penalties[i] > self.penalty_threshold:
reward = reward * (1 - penalties[i])

# Scale up the reward
reward = reward / 2 + 0.5
valid_rewards.append(reward)

try:
reward_info = {
"task_uid": task_uid,
"miner_uid": valid_uids[i],
"miner_uid": valid_reward_uids[i],
"reward": reward,
"similarity": similarities[i],
"correctness": correctness[i],
"process_time": process_times[i],
"miner_response": valid_responses[i].logic_answer.strip(),
"miner_reasoning": response_texts[i],
"penalty": penalties[i],
"miner_response": valid_reward_responses[i].logic_answer.strip(),
"miner_reasoning": reward_response_texts[i],
"question": base_synapse.raw_logic_question,
"logic_question": base_synapse.logic_question,
"ground_truth": base_synapse.ground_truth_answer,
"ref_ground_truth": ref_ground_truth,
}

reward_logs.append(reward_info)

except Exception as e:
bt.logging.error(f"Error in logging reward for valid miners: {e}")


total_uids = valid_uids + invalid_uids
total_uids = valid_reward_uids + invalid_reward_uids
rewards = valid_rewards + invalid_rewards

# Append reward logs for invalid UIDs
for invalid_uid in invalid_uids:
for invalid_uid in invalid_reward_uids:
reward_logs.append({
"task_uid": task_uid,
"miner_uid": invalid_uid,
"reward": 0,
"similarity": 0,
"correctness": 0,
"process_time": 0,
"penalty": 0,
"miner_response": "",
"miner_reasoning": "",
"question": base_synapse.raw_logic_question,
Expand Down Expand Up @@ -359,22 +377,63 @@ def _get_similarity(self, ground_truth: str, responses: list[str]):
list[float]: List of similarity scores for each response.
"""
try:
ground_truth_embedding = self.embedder.encode(ground_truth)
response_embeddings = self.embedder.encode(responses)
ground_truth_embedding = self.embedder.encode(ground_truth, convert_to_tensor=True).cpu()
response_embeddings = self.embedder.encode(responses, convert_to_tensor=True).cpu()

# Calculate similarity
similarities = []
for response_embedding in response_embeddings:
similarity = torch.nn.functional.cosine_similarity(
torch.tensor(ground_truth_embedding),
torch.tensor(response_embedding),
dim=0,
)
similarities.append(similarity.item())
return similarities
similarities = torch.nn.functional.cosine_similarity(
ground_truth_embedding,
response_embeddings,
dim=1,
)
return similarities.tolist()
except Exception as e:
bt.logging.warning(f"Failed to calculate similarity.\nError: {e}")
return [0.5] * len(responses)

def _get_penalties(self, reward_responses_texts: list[str], non_reward_responses_texts: list[str]):
"""Calculate penalties for each response.
Penalty is calculated based on the similarity between the response and the other responses using embedding similarity and Levenshtein distance.

Args:
reward_responses_texts (list[str]): List of responses from miners to be rewarded.
non_reward_responses_texts (list[str]): List of responses from miners to be penalized.

Returns:
list[float]: List of penalties for each response, 0 if the response is not penalized, 1 if the response is highly penalized.
"""
penalties = []
try:
all_responses = reward_responses_texts + non_reward_responses_texts

# Calculate embedding similarity
embeddings = self.embedder.encode(all_responses, convert_to_tensor=True).cpu()
normalized_embeddings = torch.nn.functional.normalize(embeddings, p=2, dim=1)
embedding_similarity_matrix = torch.matmul(normalized_embeddings, normalized_embeddings.T)
# We don't want to penalize the response with itself
embedding_similarity_matrix = embedding_similarity_matrix - torch.eye(embedding_similarity_matrix.shape[0])
reward_responses_embedding_similarity = embedding_similarity_matrix[:len(reward_responses_texts)].max(dim=1).values # consider the max similarity for each reward response
reward_responses_embedding_similarity = (reward_responses_embedding_similarity + 1) / 2

# Calculate Levenshtein distance
levenshtein_similarity_matrix = torch.zeros((len(all_responses), len(all_responses)))
for i in range(len(all_responses)):
for j in range(len(all_responses)):
if i != j:
levenshtein_similarity_matrix[i, j] = fuzz.partial_ratio(all_responses[i], all_responses[j]) / 100
else:
levenshtein_similarity_matrix[i, j] = 0

reward_responses_levenshtein_similarity = levenshtein_similarity_matrix[:len(reward_responses_texts)].max(dim=1).values # consider the max similarity for each reward response

penalties = (reward_responses_embedding_similarity + reward_responses_levenshtein_similarity) / 2
penalties = penalties.clip(0, 1)
penalties = [float(penalty) for penalty in penalties.tolist()]
return penalties

except Exception as e:
bt.logging.warning(f"Failed to calculate penalties.\nError: {e}")
return [0.0] * len(reward_responses_texts)

def _get_ground_truth(self, question: str):
"""Generate self-generated ground truth based on the question.
Expand Down
54 changes: 34 additions & 20 deletions neurons/validator/core/serving_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,28 +86,42 @@ def get_batch_query(self, batch_size: int):
if not self.total_uids_remaining:
return
more_data = True
initial_total = self.total_uids_remaining

while more_data:
more_data = False
for category, q in self.synthentic_queue.items():
if q.empty():
continue
time_to_sleep = self.time_per_loop * (
min(batch_size / (self.total_uids_remaining + 1), 1)
)
uids_to_query = []
should_rewards = []

while len(uids_to_query) < batch_size and not q.empty():
more_data = True
query_item = q.get()
uids_to_query.append(query_item.uid)
should_rewards.append(self.random_should_reward(query_item.uid))

if query_item.uid not in self.synthentic_rewarded:
self.synthentic_rewarded[query_item.uid] = 0
self.synthentic_rewarded[query_item.uid] += 1

yield category, uids_to_query, should_rewards, time_to_sleep
# Get non-empty categories
available_categories = [
category for category, q in self.synthentic_queue.items()
if q and not q.empty()
]

if not available_categories:
continue

# Randomly select one category
category = random.choice(available_categories)
q = self.synthentic_queue.get(category)

# Calculate progress through queue (0.0 to 1.0)
progress = 1 - (self.total_uids_remaining / max(initial_total, 1))
time_to_sleep = self.time_per_loop * min(batch_size / (initial_total + 1), 1) * math.exp(progress)

uids_to_query = []
should_rewards = []

while len(uids_to_query) < batch_size and not q.empty():
more_data = True
query_item = q.get()
uids_to_query.append(query_item.uid)
should_rewards.append(self.random_should_reward(query_item.uid))

if query_item.uid not in self.synthentic_rewarded:
self.synthentic_rewarded[query_item.uid] = 0
self.synthentic_rewarded[query_item.uid] += 1
self.total_uids_remaining -= 1

yield category, uids_to_query, should_rewards, time_to_sleep

def random_should_reward(self, uid):
if uid not in self.synthentic_rewarded or self.synthentic_rewarded[uid] < 2:
Expand Down
Loading