diff --git a/apps/hubble/src/rpc/server.ts b/apps/hubble/src/rpc/server.ts index 50de7106ba..3d7cb5a4e0 100644 --- a/apps/hubble/src/rpc/server.ts +++ b/apps/hubble/src/rpc/server.ts @@ -350,6 +350,7 @@ class IpConnectionLimiter { } export function destroyStream(stream: ServerWritableStream | ServerDuplexStream, error: Error) { + log.error({ destroyStreamError: error }, "Destroying stream from server side"); stream.emit("error", error); stream.end(); } @@ -447,7 +448,7 @@ export default class Server { return new Promise((resolve, reject) => { this.grpcServer.bindAsync(`${ip}:${port}`, ServerCredentials.createInsecure(), (err, port) => { if (err) { - logger.error({ component: "gRPC Server", err }, "Failed to start gRPC Server. Is the port already in use?"); + log.error({ component: "gRPC Server", err }, "Failed to start gRPC Server. Is the port already in use?"); reject(err); } else { this.grpcServer.start(); @@ -455,7 +456,7 @@ export default class Server { this.listenIp = ip; this.port = port; - logger.info({ component: "gRPC Server", address: this.address }, "Starting gRPC Server"); + log.info({ component: "gRPC Server", address: this.address }, "Starting gRPC Server"); resolve(port); } }); @@ -531,9 +532,10 @@ export default class Server { public getInfoRPC(call: ServerUnaryCall, callback: sendUnaryData) { (async () => { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "getInfo", req: call?.request || { dbStats: false } }, `RPC call from ${peer}`); + log.info({ method: "getInfo", req: call?.request || { dbStats: false } }, `RPC call from ${peer} started`); const info = await this.getInfo(call?.request || { dbStats: false }); + log.info({ method: "getInfo", req: call?.request || { dbStats: false } }, `RPC call from ${peer} complete`); callback(null, info); })(); @@ -557,9 +559,11 @@ export default class Server { public stopSyncRPC(call: ServerUnaryCall, callback: sendUnaryData) { (async () => { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "stopSync", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "stopSync", req: call.request }, `RPC call from ${peer} started`); const result = await this.stopSync(); + + log.info({ method: "stopSync", req: call.request }, `RPC call from ${peer} complete`); if (result.isErr()) { callback(toServiceError(result.error)); } else { @@ -603,9 +607,12 @@ export default class Server { ) { (async () => { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "forceSync", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "forceSync", req: call.request }, `RPC call from ${peer} started`); const result = await this.forceSync(call.request); + + log.info({ method: "forceSync", req: call.request }, `RPC call from ${peer} complete`); + if (result.isErr()) { callback(toServiceError(result.error)); } else { @@ -631,9 +638,12 @@ export default class Server { ) { (async () => { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "getCurrentPeers", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "getCurrentPeers", req: call.request }, `RPC call from ${peer} started`); const result = this.getCurrentPeers(); + + log.info({ method: "getCurrentPeers", req: call.request }, `RPC call from ${peer} complete`); + callback(null, result); })(); } @@ -687,10 +697,12 @@ export default class Server { ) { (async () => { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "getSyncStatus", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "getSyncStatus", req: call.request }, `RPC call from ${peer} started`); const peerId = call.request.peerId; const result = await this.getSyncStatus(peerId); + + log.info({ method: "getSyncStatus", req: call.request }, `RPC call from ${peer} complete`); if (result.isErr()) { callback(toServiceError(result.error)); } else { @@ -706,10 +718,12 @@ export default class Server { public getAllSyncIdsByPrefixRPC(call: ServerUnaryCall, callback: sendUnaryData) { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "getAllSyncIdsByPrefix", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "getAllSyncIdsByPrefix", req: call.request }, `RPC call from ${peer} started`); (async () => { const result = await this.getAllSyncIdsByPrefix(call.request); + + log.info({ method: "getAllSyncIdsByPrefix", req: call.request }, `RPC call from ${peer} complete`); if (result.isErr()) { callback(toServiceError(result.error)); } else { @@ -754,9 +768,11 @@ export default class Server { callback: sendUnaryData, ) { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "getAllMessagesBySyncIds", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "getAllMessagesBySyncIds", req: call.request }, `RPC call from ${peer} started`); const result = await this.getAllMessagesBySyncIds(call.request); + + log.info({ method: "getAllMessagesBySyncIds", req: call.request }, `RPC call from ${peer} complete`); if (result.isErr()) { callback(toServiceError(result.error)); } else { @@ -774,10 +790,12 @@ export default class Server { callback: sendUnaryData, ) { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "getSyncMetadataByPrefix", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "getSyncMetadataByPrefix", req: call.request }, `RPC call from ${peer} started`); (async () => { const result = await this.getSyncMetadataByPrefix(call.request); + + log.info({ method: "getSyncMetadataByPrefix", req: call.request }, `RPC call from ${peer} complete`); if (result.isErr()) { callback(toServiceError(result.error)); } else { @@ -809,7 +827,7 @@ export default class Server { callback: sendUnaryData, ) { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "getSyncSnapshotByPrefix", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "getSyncSnapshotByPrefix", req: call.request }, `RPC call from ${peer} started`); // If someone is asking for our sync snapshot, that means we're getting incoming // connections @@ -818,6 +836,8 @@ export default class Server { (async () => { const result = await this.getSyncSnapshotByPrefix(call.request); + + log.info({ method: "getSyncSnapshotByPrefix", req: call.request }, `RPC call from ${peer} complete`); if (result.isErr()) { callback(toServiceError(result.error)); } else { @@ -842,9 +862,11 @@ export default class Server { callback: sendUnaryData, ) { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "getOnChainSignersByFid", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "getOnChainSignersByFid", req: call.request }, `RPC call from ${peer} started`); const signersResult = await this.getOnChainSignersByFid(call.request); + + log.info({ method: "getOnChainSignersByFid", req: call.request }, `RPC call from ${peer} complete`); signersResult?.match( (page: OnChainEventResponse) => { callback(null, page); @@ -867,10 +889,11 @@ export default class Server { callback: sendUnaryData, ) { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "getOnChainEvents", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "getOnChainEvents", req: call.request }, `RPC call from ${peer} started`); (async () => { const result = await this.getOnChainEvents(call.request); + log.info({ method: "getOnChainEvents", req: call.request }, `RPC call from ${peer} complete`); if (result.isErr()) { callback(toServiceError(result.error)); } else { @@ -905,7 +928,7 @@ export default class Server { const rateLimitResult = await rateLimitByIp(peer, this.submitMessageRateLimiter); if (rateLimitResult.isErr()) { - logger.warn({ peer }, "submitMessage rate limited"); + log.warn({ peer }, "submitMessage rate limited"); callback(toServiceError(new HubError("unavailable", "API rate limit exceeded"))); return; } @@ -913,7 +936,7 @@ export default class Server { // Authentication const authResult = authenticateUser(call.metadata, this.rpcUsers); if (authResult.isErr()) { - logger.warn({ errMsg: authResult.error.message }, "gRPC submitMessage failed"); + log.warn({ errMsg: authResult.error.message }, "gRPC submitMessage failed"); callback( toServiceError(new HubError("unauthenticated", `gRPC authentication failed: ${authResult.error.message}`)), ); @@ -941,7 +964,7 @@ export default class Server { // Check for rate limits const rateLimitResult = await rateLimitByIp(peer, this.submitMessageRateLimiter); if (rateLimitResult.isErr()) { - logger.warn({ peer }, "submitBulkMessages rate limited"); + log.warn({ peer }, "submitBulkMessages rate limited"); callback(toServiceError(new HubError("unavailable", "API rate limit exceeded"))); return; } @@ -949,7 +972,7 @@ export default class Server { // Authentication const authResult = authenticateUser(call.metadata, this.rpcUsers); if (authResult.isErr()) { - logger.warn({ errMsg: authResult.error.message }, "gRPC submitBulkMessages failed"); + log.warn({ errMsg: authResult.error.message }, "gRPC submitBulkMessages failed"); callback( toServiceError(new HubError("unauthenticated", `gRPC authentication failed: ${authResult.error.message}`)), ); @@ -1009,11 +1032,13 @@ export default class Server { }, getCast: async (call, callback) => { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "getCast", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "getCast", req: call.request }, `RPC call from ${peer} started`); const request = call.request; const castAddResult = await this.engine?.getCast(request.fid, request.hash); + + log.info({ method: "getCast", req: call.request }, `RPC call from ${peer} complete`); castAddResult?.match( (castAdd: CastAddMessage) => { callback(null, castAdd); @@ -1025,7 +1050,7 @@ export default class Server { }, getCastsByFid: async (call, callback) => { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "getCastsByFid", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "getCastsByFid", req: call.request }, `RPC call from ${peer} started`); const { fid, pageSize, pageToken, reverse } = call.request; @@ -1034,6 +1059,9 @@ export default class Server { pageToken, reverse, }); + + log.info({ method: "getCastsByFid", req: call.request }, `RPC call from ${peer} complete`); + castsResult?.match( (page: MessagesPage) => { callback(null, messagesPageToResponse(page)); @@ -1045,7 +1073,7 @@ export default class Server { }, getCastsByParent: async (call, callback) => { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "getCastsByParent", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "getCastsByParent", req: call.request }, `RPC call from ${peer} complete`); const { parentCastId, parentUrl, pageSize, pageToken, reverse } = call.request; @@ -1061,6 +1089,9 @@ export default class Server { pageToken, reverse, }); + + log.info({ method: "getCastsByParent", req: call.request }, `RPC call from ${peer} complete`); + castsResult?.match( (page: MessagesPage) => { callback(null, messagesPageToResponse(page)); @@ -1072,11 +1103,14 @@ export default class Server { }, getCastsByMention: async (call, callback) => { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "getCastsByMention", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "getCastsByMention", req: call.request }, `RPC call from ${peer} started`); const { fid, pageSize, pageToken, reverse } = call.request; const castsResult = await this.engine?.getCastsByMention(fid, { pageSize, pageToken, reverse }); + + log.info({ method: "getCastsByMention", req: call.request }, `RPC call from ${peer} complete`); + castsResult?.match( (page: MessagesPage) => { callback(null, messagesPageToResponse(page)); @@ -1088,7 +1122,7 @@ export default class Server { }, getReaction: async (call, callback) => { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "getReaction", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "getReaction", req: call.request }, `RPC call from ${peer} started`); const request = call.request; @@ -1097,6 +1131,8 @@ export default class Server { request.reactionType, request.targetCastId ?? request.targetUrl ?? "", ); + + log.info({ method: "getReaction", req: call.request }, `RPC call from ${peer} complete`); reactionResult?.match( (reaction: ReactionAddMessage) => { callback(null, reaction); @@ -1108,7 +1144,7 @@ export default class Server { }, getReactionsByFid: async (call, callback) => { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "getReactionsByFid", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "getReactionsByFid", req: call.request }, `RPC call from ${peer}`); const { fid, reactionType, pageSize, pageToken, reverse } = call.request; const reactionsResult = await this.engine?.getReactionsByFid(fid, reactionType, { @@ -1116,6 +1152,8 @@ export default class Server { pageToken, reverse, }); + + log.info({ method: "getReactionsByFid", req: call.request }, `RPC call from ${peer} complete`); reactionsResult?.match( (page: MessagesPage) => { callback(null, messagesPageToResponse(page)); @@ -1127,7 +1165,7 @@ export default class Server { }, getReactionsByCast: async (call, callback) => { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "getReactionsByCast", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "getReactionsByCast", req: call.request }, `RPC call from ${peer}`); const { targetCastId, reactionType, pageSize, pageToken, reverse } = call.request; const reactionsResult = await this.engine?.getReactionsByTarget(targetCastId ?? CastId.create(), reactionType, { @@ -1135,6 +1173,8 @@ export default class Server { pageToken, reverse, }); + + log.info({ method: "getReactionsByCast", req: call.request }, `RPC call from ${peer} complete`); reactionsResult?.match( (page: MessagesPage) => { callback(null, messagesPageToResponse(page)); @@ -1146,7 +1186,7 @@ export default class Server { }, getReactionsByTarget: async (call, callback) => { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "getReactionsByTarget", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "getReactionsByTarget", req: call.request }, `RPC call from ${peer}`); const { targetCastId, targetUrl, reactionType, pageSize, pageToken, reverse } = call.request; const reactionsResult = await this.engine?.getReactionsByTarget(targetCastId ?? targetUrl ?? "", reactionType, { @@ -1154,6 +1194,8 @@ export default class Server { pageToken, reverse, }); + + log.info({ method: "getReactionsByTarget", req: call.request }, `RPC call from ${peer} complete`); reactionsResult?.match( (page: MessagesPage) => { callback(null, messagesPageToResponse(page)); @@ -1165,11 +1207,13 @@ export default class Server { }, getUserData: async (call, callback) => { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "getUserData", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "getUserData", req: call.request }, `RPC call from ${peer} started`); const request = call.request; const userDataResult = await this.engine?.getUserData(request.fid, request.userDataType); + + log.info({ method: "getUserData", req: call.request }, `RPC call from ${peer} complete`); userDataResult?.match( (userData: UserDataAddMessage) => { callback(null, userData); @@ -1181,7 +1225,7 @@ export default class Server { }, getUserDataByFid: async (call, callback) => { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "getUserDataByFid", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "getUserDataByFid", req: call.request }, `RPC call from ${peer} started`); const { fid, pageSize, pageToken, reverse } = call.request; @@ -1190,6 +1234,8 @@ export default class Server { pageToken, reverse, }); + + log.info({ method: "getUserDataByFid", req: call.request }, `RPC call from ${peer} complete`); userDataResult?.match( (page: MessagesPage) => { callback(null, messagesPageToResponse(page)); @@ -1201,11 +1247,13 @@ export default class Server { }, getUsernameProof: async (call, callback) => { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "getUsernameProof", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "getUsernameProof", req: call.request }, `RPC call from ${peer} started`); const request = call.request; const usernameProofResult = await this.engine?.getUserNameProof(request.name); + + log.info({ method: "getUsernameProof", req: call.request }, `RPC call from ${peer} complete`); usernameProofResult?.match( (usernameProof: UserNameProof) => { callback(null, usernameProof); @@ -1217,11 +1265,13 @@ export default class Server { }, getUserNameProofsByFid: async (call, callback) => { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "getUserNameProofsByFid", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "getUserNameProofsByFid", req: call.request }, `RPC call from ${peer} started`); const request = call.request; const usernameProofResult = await this.engine?.getUserNameProofsByFid(request.fid); + + log.info({ method: "getUserNameProofsByFid", req: call.request }, `RPC call from ${peer} complete`); usernameProofResult?.match( (usernameProofs: UserNameProof[]) => { callback(null, UsernameProofsResponse.create({ proofs: usernameProofs })); @@ -1233,11 +1283,13 @@ export default class Server { }, getVerification: async (call, callback) => { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "getVerification", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "getVerification", req: call.request }, `RPC call from ${peer} started`); const request = call.request; const verificationResult = await this.engine?.getVerification(request.fid, request.address); + + log.info({ method: "getVerification", req: call.request }, `RPC call from ${peer} complete`); verificationResult?.match( (verification: VerificationAddAddressMessage) => { callback(null, verification); @@ -1249,7 +1301,7 @@ export default class Server { }, getVerificationsByFid: async (call, callback) => { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "getVerificationsByFid", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "getVerificationsByFid", req: call.request }, `RPC call from ${peer} started`); const { fid, pageSize, pageToken, reverse } = call.request; @@ -1258,6 +1310,8 @@ export default class Server { pageToken, reverse, }); + + log.info({ method: "getVerificationsByFid", req: call.request }, `RPC call from ${peer} complete`); verificationsResult?.match( (page: MessagesPage) => { callback(null, messagesPageToResponse(page)); @@ -1269,11 +1323,13 @@ export default class Server { }, getOnChainSigner: async (call, callback) => { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "getOnChainSigner", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "getOnChainSigner", req: call.request }, `RPC call from ${peer} started`); const request = call.request; const signerResult = await this.engine?.getActiveSigner(request.fid, request.signer); + + log.info({ method: "getOnChainSigner", req: call.request }, `RPC call from ${peer} complete`); signerResult?.match( (signer: SignerOnChainEvent) => { callback(null, signer); @@ -1285,11 +1341,13 @@ export default class Server { }, getLink: async (call, callback) => { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "getLink", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "getLink", req: call.request }, `RPC call from ${peer} started`); const request = call.request; const linkResult = await this.engine?.getLink(request.fid, request.linkType, request.targetFid ?? 0); + + log.info({ method: "getLink", req: call.request }, `RPC call from ${peer} complete`); linkResult?.match( (link: LinkAddMessage) => { callback(null, link); @@ -1301,7 +1359,7 @@ export default class Server { }, getLinksByFid: async (call, callback) => { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "getLinksByFid", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "getLinksByFid", req: call.request }, `RPC call from ${peer} started`); const { fid, linkType, pageSize, pageToken, reverse } = call.request; const linksResult = await this.engine?.getLinksByFid(fid, linkType, { @@ -1309,6 +1367,8 @@ export default class Server { pageToken, reverse, }); + + log.info({ method: "getLinksByFid", req: call.request }, `RPC call from ${peer} complete`); linksResult?.match( (page: MessagesPage) => { callback(null, messagesPageToResponse(page)); @@ -1320,7 +1380,7 @@ export default class Server { }, getLinksByTarget: async (call, callback) => { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "getLinksByTarget", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "getLinksByTarget", req: call.request }, `RPC call from ${peer} started`); const { targetFid, linkType, pageSize, pageToken, reverse } = call.request; const linksResult = await this.engine?.getLinksByTarget(targetFid ?? 0, linkType, { @@ -1328,6 +1388,8 @@ export default class Server { pageToken, reverse, }); + + log.info({ method: "getLinksByTarget", req: call.request }, `RPC call from ${peer} complete`); linksResult?.match( (page: MessagesPage) => { callback(null, messagesPageToResponse(page)); @@ -1339,10 +1401,12 @@ export default class Server { }, getIdRegistryOnChainEvent: async (call, callback) => { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "getIdRegistryOnChainEvent", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "getIdRegistryOnChainEvent", req: call.request }, `RPC call from ${peer} started`); const request = call.request; const idRegistryEventResult = await this.engine?.getIdRegistryOnChainEvent(request.fid); + + log.info({ method: "getIdRegistryOnChainEvent", req: call.request }, `RPC call from ${peer} complete`); idRegistryEventResult?.match( (idRegistryEvent: OnChainEvent) => { callback(null, idRegistryEvent); @@ -1354,10 +1418,12 @@ export default class Server { }, getIdRegistryOnChainEventByAddress: async (call, callback) => { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "getIdRegistryOnChainEventByAddress", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "getIdRegistryOnChainEventByAddress", req: call.request }, `RPC call from ${peer} started`); const request = call.request; const idRegistryEventResult = await this.engine?.getIdRegistryOnChainEventByAddress(request.address); + + log.info({ method: "getIdRegistryOnChainEventByAddress", req: call.request }, `RPC call from ${peer} complete`); idRegistryEventResult?.match( (idRegistryEvent: OnChainEvent) => { callback(null, idRegistryEvent); @@ -1368,8 +1434,13 @@ export default class Server { ); }, getCurrentStorageLimitsByFid: async (call, callback) => { + const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); + log.info({ method: "getCurrentStorageLimitsByFid", req: call.request }, `RPC call from ${peer} started`); + const request = call.request; const storageLimitsResult = await this.engine?.getCurrentStorageLimitsByFid(request.fid); + + log.info({ method: "getCurrentStorageLimitsByFid", req: call.request }, `RPC call from ${peer} complete`); storageLimitsResult?.match( (storageLimits: StorageLimitsResponse) => { callback(null, storageLimits); @@ -1381,7 +1452,7 @@ export default class Server { }, getFids: async (call, callback) => { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "getFids", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "getFids", req: call.request }, `RPC call from ${peer} started`); const { pageSize, pageToken, reverse } = call.request; @@ -1390,6 +1461,7 @@ export default class Server { pageToken, reverse, }); + log.info({ method: "getFids", req: call.request }, `RPC call from ${peer} complete`); result?.match( (page: { fids: number[]; nextPageToken: Uint8Array | undefined }) => { callback(null, FidsResponse.create(page)); @@ -1401,7 +1473,7 @@ export default class Server { }, getAllCastMessagesByFid: async (call, callback) => { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "getAllCastMessagesByFid", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "getAllCastMessagesByFid", req: call.request }, `RPC call from ${peer} started`); const { fid, pageSize, pageToken, reverse, startTimestamp, stopTimestamp } = call.request; const result = await this.engine?.getAllCastMessagesByFid( @@ -1414,6 +1486,7 @@ export default class Server { startTimestamp, stopTimestamp, ); + log.info({ method: "getAllCastMessagesByFid", req: call.request }, `RPC call from ${peer} complete`); result?.match( (page: MessagesPage) => { callback(null, messagesPageToResponse(page)); @@ -1425,7 +1498,7 @@ export default class Server { }, getAllReactionMessagesByFid: async (call, callback) => { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "getAllReactionMessagesByFid", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "getAllReactionMessagesByFid", req: call.request }, `RPC call from ${peer} started`); const { fid, pageSize, pageToken, reverse, startTimestamp, stopTimestamp } = call.request; const result = await this.engine?.getAllReactionMessagesByFid( @@ -1438,6 +1511,7 @@ export default class Server { startTimestamp, stopTimestamp, ); + log.info({ method: "getAllReactionMessagesByFid", req: call.request }, `RPC call from ${peer} complete`); result?.match( (page: MessagesPage) => { callback(null, messagesPageToResponse(page)); @@ -1449,7 +1523,7 @@ export default class Server { }, getAllVerificationMessagesByFid: async (call, callback) => { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "getAllVerificationMessagesByFid", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "getAllVerificationMessagesByFid", req: call.request }, `RPC call from ${peer} started`); const { fid, pageSize, pageToken, reverse, startTimestamp, stopTimestamp } = call.request; const result = await this.engine?.getAllVerificationMessagesByFid( @@ -1462,6 +1536,8 @@ export default class Server { startTimestamp, stopTimestamp, ); + + log.info({ method: "getAllVerificationMessagesByFid", req: call.request }, `RPC call from ${peer} complete`); result?.match( (page: MessagesPage) => { callback(null, messagesPageToResponse(page)); @@ -1473,7 +1549,7 @@ export default class Server { }, getAllUserDataMessagesByFid: async (call, callback) => { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "getAllUserDataMessagesByFid", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "getAllUserDataMessagesByFid", req: call.request }, `RPC call from ${peer} started`); const { fid, pageSize, pageToken, reverse, startTimestamp, stopTimestamp } = call.request; const result = await this.engine?.getUserDataByFid( @@ -1486,6 +1562,8 @@ export default class Server { startTimestamp, stopTimestamp, ); + + log.info({ method: "getAllUserDataMessagesByFid", req: call.request }, `RPC call from ${peer} complete`); result?.match( (page: MessagesPage) => { callback(null, messagesPageToResponse(page)); @@ -1497,7 +1575,7 @@ export default class Server { }, getAllLinkMessagesByFid: async (call, callback) => { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "getAllLinkMessagesByFid", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "getAllLinkMessagesByFid", req: call.request }, `RPC call from ${peer} started`); const { fid, pageSize, pageToken, reverse, startTimestamp, stopTimestamp } = call.request; const result = await this.engine?.getAllLinkMessagesByFid( @@ -1510,6 +1588,8 @@ export default class Server { startTimestamp, stopTimestamp, ); + + log.info({ method: "getAllLinkMessagesByFid", req: call.request }, `RPC call from ${peer} complete`); result?.match( (page: MessagesPage) => { callback(null, messagesPageToResponse(page)); @@ -1521,7 +1601,7 @@ export default class Server { }, getLinkCompactStateMessageByFid: async (call, callback) => { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "getLinkCompactStateMessageByFid", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "getLinkCompactStateMessageByFid", req: call.request }, `RPC call from ${peer} started`); const { fid, pageSize, pageToken, reverse } = call.request; const result = await this.engine?.getLinkCompactStateMessageByFid(fid, { @@ -1529,6 +1609,8 @@ export default class Server { pageToken, reverse, }); + + log.info({ method: "getLinkCompactStateMessageByFid", req: call.request }, `RPC call from ${peer} complete`); result?.match( (page: MessagesPage) => { callback(null, messagesPageToResponse(page)); @@ -1540,9 +1622,10 @@ export default class Server { }, getEvent: async (call, callback) => { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); - log.debug({ method: "getEvent", req: call.request }, `RPC call from ${peer}`); + log.info({ method: "getEvent", req: call.request }, `RPC call from ${peer} started`); const result = await this.engine?.getEvent(call.request.id); + log.info({ method: "getEvent", req: call.request }, `RPC call from ${peer} complete`); result?.match( (event: HubEvent) => callback(null, event), (err: HubError) => callback(toServiceError(err)), @@ -1616,13 +1699,17 @@ export default class Server { this.subscribeIpLimiter.removeConnection(peer); - log.info({ peer }, "subscribe: stream closed"); + log.info({ r: request, peer }, "subscribe: stream closed"); }); // If the user wants to start from a specific event, we'll start from there first if (this.engine && request.fromId !== undefined && request.fromId >= 0) { const eventsIteratorOpts = this.engine.eventHandler.getEventsIteratorOpts({ fromId: request.fromId }); if (eventsIteratorOpts.isErr()) { + log.error( + { r: request, eventIteratorOptsError: eventsIteratorOpts.error }, + "Error getting events iterator, destroying stream", + ); destroyStream(stream, eventsIteratorOpts.error); return; } @@ -1631,7 +1718,7 @@ export default class Server { // This is to prevent a situation where we're writing to the stream, but the client // is not reading it. const timeout = setTimeout(async () => { - logger.warn( + log.warn( { timeout: HUBEVENTS_READER_TIMEOUT, peer: stream.getPeer() }, "HubEvents subscribe: timeout, stopping stream", ); @@ -1661,7 +1748,7 @@ export default class Server { const writeResult = bufferedStreamWriter.writeToStream(event); if (writeResult.isErr()) { - logger.warn( + log.warn( { err: writeResult.error }, `subscribe: failed to write to stream while returning events ${request.fromId}`, ); @@ -1681,7 +1768,7 @@ export default class Server { // more than 1G, so we're writing a lot of data to the stream, but the client is not reading it. // We'll destroy the stream. const error = new HubError("unavailable.network_failure", "stream memory usage too much"); - logger.error({ errCode: error.errCode }, error.message); + log.error({ errCode: error.errCode }, error.message); destroyStream(stream, error); return true;