Skip to content

Commit

Permalink
swap to short polling for stability
Browse files Browse the repository at this point in the history
  • Loading branch information
abstxn committed Oct 19, 2024
1 parent b715d8a commit 0650da9
Show file tree
Hide file tree
Showing 9 changed files with 261 additions and 153 deletions.
49 changes: 32 additions & 17 deletions db-init/init-kafka.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,39 @@ const { Kafka } = require('kafkajs');
const kafka = new Kafka({ brokers: ['kafka:9092'] });
const admin = kafka.admin();

async function createTopicIfNotExists(topicName, partitionCount) {
await admin.connect();
async function createTopicsIfNotExists(topicsToCreate) {
try {
await admin.connect();

const topics = await admin.listTopics();
if (!topics.includes(topicName)) {
await admin.createTopics({
topics: [{ topic: topicName, numPartitions: partitionCount, replicationFactor: 1 }],
});
console.log(`Topic ${topicName} created`);
} else {
console.log(`Topic ${topicName} already exists`);
}
// List existing topics
const existingTopics = await admin.listTopics();

// Filter out the topics that already exist
const topicsToActuallyCreate = topicsToCreate.filter(topic => !existingTopics.includes(topic.topic));

await admin.disconnect();
if (topicsToActuallyCreate.length > 0) {
// Create topics that don't exist
await admin.createTopics({
topics: topicsToActuallyCreate,
});
console.log(`Created topics: ${topicsToActuallyCreate.map(t => t.topic).join(', ')}`);
} else {
console.log('All topics already exist.');
}
} catch (error) {
console.error('Error creating topics:', error);
} finally {
await admin.disconnect();
}
}

// Create Topics for matchmaking events
createTopicIfNotExists('match-events', 3);
createTopicIfNotExists('cancel-match-events', 3);
createTopicIfNotExists('match-found-events', 3);
createTopicIfNotExists('dequeue-events', 3);
// Define the topics to create
const topics = [
{ topic: 'match-events', numPartitions: 3, replicationFactor: 1 },
{ topic: 'cancel-match-events', numPartitions: 3, replicationFactor: 1 },
{ topic: 'match-found-events', numPartitions: 3, replicationFactor: 1 },
{ topic: 'dequeue-events', numPartitions: 3, replicationFactor: 1 }
];

// Create the topics if they do not exist
createTopicsIfNotExists(topics);
158 changes: 78 additions & 80 deletions frontend-service/src/pages/MatchingPage.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,145 +5,143 @@ import Countdown from "../../components/matchmaking/Countdown";
import MatchUnsuccess from "../../components/matchmaking/MatchUnsuccess";
import MatchSuccess from "../../components/matchmaking/MatchSuccess";

// Define constants for match stages
const STAGE = {
MATCHME: "matchme",
COUNTDOWN: "countdown",
SUCCESS: "success",
UNSUCCESSFUL: "unsuccessful",
};

const MatchingPage: React.FC = () => {
const [stage, setStage] = useState("matchme");
const [stage, setStage] = useState(STAGE.MATCHME);
const [selectedTopic, setSelectedTopic] = useState("");
const [selectedDifficulty, setSelectedDifficulty] = useState("");
const [loading, setLoading] = useState(false); // New loading state
const navigate = useNavigate();

// Helper function to handle fetch requests with error handling
const fetchWithAuth = async (url: string, options: RequestInit = {}) => {
const token = localStorage.getItem("token");
const headers = {
"Content-Type": "application/json",
Authorization: `Bearer ${token}`,
...options.headers,
};

try {
const response = await fetch(url, { ...options, headers });
if (!response.ok) throw new Error(`Error: ${response.status}`);
return await response.json();
} catch (error) {
console.error("Request failed:", error);
throw error;
}
};

// Trigger handlers according to match status in server
const checkMatchStatus = async () => {
const response = await fetch("http://localhost:3002/match-status", {
method: "GET",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${localStorage.getItem("token")}`
}
});
const result = await response.json();
const matchStatus = result["matchStatus"];
handleMatchStatusReceived(matchStatus);
// setLoading(true); // Indicate loading
try {
const result = await fetchWithAuth("http://localhost:3002/match-status");
const matchStatus = result.matchStatus;
console.log(matchStatus);
handleMatchStatusReceived(matchStatus);
} catch {
console.error("Failed to check match status.");
} finally {
// setLoading(false); // Remove loading
}
};
const handleMatchStatusReceived = (matchStatus:string) => {

const handleMatchStatusReceived = (matchStatus: string) => {
if (matchStatus == "isNotMatching") {
// handleCancel();
setStage("matchme");
setStage(STAGE.MATCHME);
} else if (matchStatus == "isMatching") {
// handleMatchMe();
setStage("countdown");
fetch("http://localhost:3002/continue-matching", {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${localStorage.getItem("token")}`
},
})
.then(response => response.json())
.then(data => {
console.log("matching-service response: ", data)
if (data.message == "Match found!") {
handleMatchFound();
}
})
.catch(err => {console.error(err)});
setStage(STAGE.COUNTDOWN);
} else if (matchStatus == "isMatched") {
handleMatchFound();
}
};

const handleMatchMe = () => {
const getMatchLongPoll = () => {
fetch("http://localhost:3002/find-match", {
// Simply send a find match request to be put in the queue
const handleMatchMe = async () => {
setStage(STAGE.COUNTDOWN);
try {
await fetchWithAuth("http://localhost:3002/find-match", {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${localStorage.getItem("token")}`
},
body: JSON.stringify(
{
topic: selectedTopic,
difficulty: selectedDifficulty
}
)
})
.then(response => response.json())
.then(data => {
console.log("matching-service response: ", data)
if (data.message == "Match found!") {
handleMatchFound();
}
})
.catch(err => {console.error(err)});
};

setStage("countdown");
getMatchLongPoll();
body: JSON.stringify({ topic: selectedTopic, difficulty: selectedDifficulty }),
});
} catch (error) {
console.error("Failed to find match: ", error);
}
};

const handleMatchFound = () => {
setStage("success");
setStage(STAGE.SUCCESS);
};

const handleMatchUnsuccess = () => {
setStage("unsuccessful");
setStage(STAGE.UNSUCCESSFUL);
};

const handleRetry = () => {
setStage("countdown");
setStage(STAGE.COUNTDOWN);
};

const handleCancel = () => {
const sendCancelMatchingRequest = () => {
fetch("http://localhost:3002/cancel-matching", {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${localStorage.getItem("token")}`
}
})
}

sendCancelMatchingRequest();

setStage("matchme");
const handleCancel = async () => {
// try {
// await fetchWithAuth("http://localhost:3002/cancel-matching", { method: "POST" });
// } catch (error) {
// console.error("Failed to cancel matching.");
// }
setStage(STAGE.MATCHME);
};

const handleBackToDashboard = () => {
navigate("/dashboard");
};


// Ensure that when the page is loaded/reloaded, the stage state is always
// correct with respect to the actual user's match state in backend.
useEffect(() => {
checkMatchStatus();
const interval = setInterval(() => {
checkMatchStatus();
}, 2000);
return () => clearInterval(interval); // Cleanup on unmount
}, []);

return (
<div>
{stage === "matchme" &&
{loading && <p>Loading...</p>} {/* Show loading message */}

{stage === STAGE.MATCHME && (
<MatchMe
onMatchMe={handleMatchMe}
selectedTopic={selectedTopic}
updateSelectedTopic={setSelectedTopic}
selectedDifficulty={selectedDifficulty}
updateSelectedDifficulty={setSelectedDifficulty}
/>
}
{stage === "countdown" && (
)}

{stage === STAGE.COUNTDOWN && (
<Countdown
onSuccess={handleMatchFound}
onFailure={handleMatchUnsuccess}
onCancel={handleCancel}
/>
)}
{stage === "unsuccessful" && (

{stage === STAGE.UNSUCCESSFUL && (
<MatchUnsuccess
onRetry={handleRetry}
onBackToDashboard={handleBackToDashboard}
/>
)}
{stage === "success" && <MatchSuccess />}

{stage === STAGE.SUCCESS && <MatchSuccess />}
</div>
);
};
Expand Down
6 changes: 3 additions & 3 deletions matching-service/matcher-service/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import express from 'express';
import { Kafka } from 'kafkajs';
import cors from "cors";

const app = express();

Expand All @@ -9,6 +10,7 @@ const kafkaConsumer = kafka.consumer({ groupId: 'matcher-service' });


app.use(express.json());
app.use(cors());


// TODO: Implement PQ instead of just FIFO
Expand Down Expand Up @@ -41,8 +43,6 @@ let matchRequests: MatchRequestData[] = [];
// Continuous function to periodically run the matching algorithm
const runMatchingAlgorithm = async () => {
setInterval(async () => {
console.log("Running matching algorithm")
console.log(matchRequests);
if (matchRequests.length > 1) {
const matchReqDataA = matchRequests.shift();
const matchReqDataB = matchRequests.shift();
Expand All @@ -63,7 +63,7 @@ const runMatchingAlgorithm = async () => {
});
}
}
}, 5000);
}, 2000);
};

// Start the producer and the continuous matching algorithm
Expand Down
34 changes: 34 additions & 0 deletions matching-service/matcher-service/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions matching-service/matcher-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"license": "ISC",
"description": "",
"devDependencies": {
"@types/cors": "^2.8.17",
"@types/express": "^5.0.0",
"@types/kafkajs": "^1.8.2",
"@types/node": "^22.7.6",
Expand All @@ -18,6 +19,7 @@
"typescript": "^5.6.3"
},
"dependencies": {
"cors": "^2.8.5",
"express": "^4.21.1",
"kafkajs": "^2.2.4"
}
Expand Down
Loading

0 comments on commit 0650da9

Please sign in to comment.