Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor all Gmail API calls #289

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions apps/web/app/(app)/simple/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { getGmailClient } from "@/utils/gmail/client";
import { parseMessage } from "@/utils/mail";
import { SimpleModeOnboarding } from "@/app/(app)/simple/SimpleModeOnboarding";
import { ClientOnly } from "@/components/ClientOnly";
import { getMessage } from "@/utils/gmail/message";
import { getMessage, getMessages } from "@/utils/gmail/message";

export const dynamic = "force-dynamic";

Expand All @@ -28,19 +28,18 @@ export default async function SimplePage({

const categoryTitle = simpleEmailCategories.get(type);

const response = await gmail.users.messages.list({
userId: "me",
const response = await getMessages(gmail, {
labelIds: type === "OTHER" ? undefined : [type],
maxResults: 5,
q: getQuery(type),
query: getQuery(type),
pageToken,
});

// TODO need a better way to handle this. Don't want to miss messages,
// but don't want to show the same thread twice
// only take the latest email in each thread
// const filteredMessages = filterDuplicateThreads(response.data.messages || []);
const filteredMessages = response.data.messages;
const filteredMessages = response.messages;

const messages = await Promise.all(
filteredMessages?.map(async (message) => {
Expand All @@ -66,7 +65,7 @@ export default async function SimplePage({
<PageHeading className="text-center">{title}</PageHeading>
<SimpleList
messages={messages}
nextPageToken={response.data.nextPageToken}
nextPageToken={response.nextPageToken}
userEmail={email}
type={type}
/>
Expand Down
7 changes: 2 additions & 5 deletions apps/web/app/api/google/threads/[id]/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { z } from "zod";
import type { gmail_v1 } from "@googleapis/gmail";
import { NextResponse } from "next/server";
import { parseMessages } from "@/utils/mail";
import { getThread as getGmailThread } from "@/utils/gmail/thread";
import { auth } from "@/app/api/auth/[...nextauth]/auth";
import { getGmailClient } from "@/utils/gmail/client";
import type { ThreadWithPayloadMessages } from "@/utils/types";
Expand All @@ -14,11 +15,7 @@ export type ThreadQuery = z.infer<typeof threadQuery>;
export type ThreadResponse = Awaited<ReturnType<typeof getThread>>;

async function getThread(query: ThreadQuery, gmail: gmail_v1.Gmail) {
const res = await gmail.users.threads.get({
userId: "me",
id: query.id,
});
const thread = res.data;
const thread = await getGmailThread(query.id, gmail);

const messages = parseMessages(thread as ThreadWithPayloadMessages);

Expand Down
11 changes: 5 additions & 6 deletions apps/web/app/api/google/threads/basic/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@ async function getGetThreads(
{ from, labelId }: GetThreadsQuery,
gmail: gmail_v1.Gmail,
) {
const threads = await getThreads(
`from:${from}`,
labelId ? [labelId] : [],
gmail,
500,
);
const threads = await getThreads(gmail, {
q: `from:${from}`,
labelIds: labelId ? [labelId] : [],
maxResults: 500,
});
return threads.threads || [];
}

Expand Down
19 changes: 14 additions & 5 deletions apps/web/app/api/google/threads/controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ import {
import { type ThreadWithPayloadMessages, isDefined } from "@/utils/types";
import prisma from "@/utils/prisma";
import { getCategory } from "@/utils/redis/category";
import { getThreadsBatch } from "@/utils/gmail/thread";
import {
getThreadsBatch,
getThreads as getGmailThreads,
} from "@/utils/gmail/thread";
import { decodeSnippet } from "@/utils/gmail/decode";
import type { ThreadsQuery } from "@/app/api/google/threads/validation";
import { ExecutedRuleStatus } from "@prisma/client";
Expand Down Expand Up @@ -46,16 +49,22 @@ export async function getThreads(query: ThreadsQuery) {
return undefined;
}

const gmailThreads = await gmail.users.threads.list({
userId: "me",
const gmailThreads = await getGmailThreads(gmail, {
labelIds: getLabelIds(query.type),
maxResults: query.limit || 50,
q: getQuery(),
pageToken: query.nextPageToken || undefined,
});

const r = await getGmailThreads(gmail, {
labelIds: getLabelIds(query.type),
maxResults: query.limit || 50,
q: getQuery(),
pageToken: query.nextPageToken || undefined,
});

const threadIds =
gmailThreads.data.threads?.map((t) => t.id).filter(isDefined) || [];
gmailThreads.threads?.map((t) => t.id).filter(isDefined) || [];

const [threads, plans] = await Promise.all([
getThreadsBatch(threadIds, accessToken), // may have been faster not using batch method, but doing 50 getMessages in parallel
Expand Down Expand Up @@ -100,7 +109,7 @@ export async function getThreads(query: ThreadsQuery) {

return {
threads: threadsWithMessages.filter(isDefined),
nextPageToken: gmailThreads.data.nextPageToken,
nextPageToken: gmailThreads.nextPageToken,
};
}

Expand Down
20 changes: 9 additions & 11 deletions apps/web/app/api/google/watch/controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,29 @@ import { INBOX_LABEL_ID } from "@/utils/gmail/label";
import { env } from "@/env";
import { getGmailClient } from "@/utils/gmail/client";
import { captureException } from "@/utils/error";
import { unwatchUser, watchUser } from "@/utils/gmail/misc";

export async function watchEmails(userId: string, gmail: gmail_v1.Gmail) {
const res = await gmail.users.watch({
userId: "me",
requestBody: {
labelIds: [INBOX_LABEL_ID],
labelFilterBehavior: "include",
topicName: env.GOOGLE_PUBSUB_TOPIC_NAME,
},
const res = await watchUser(gmail, {
labelIds: [INBOX_LABEL_ID],
labelFilterBehavior: "include",
topicName: env.GOOGLE_PUBSUB_TOPIC_NAME,
Comment on lines +10 to +13
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Handle potential exceptions thrown by watchUser.
If the API call fails due to invalid credentials or permissions, the error may go unhandled until caught in a higher-level block. Consider adding try/catch or passing the error upward in a structured way.

});

if (res.data.expiration) {
const expirationDate = new Date(+res.data.expiration);
if (res.expiration) {
const expirationDate = new Date(+res.expiration);
await prisma.user.update({
where: { id: userId },
data: { watchEmailsExpirationDate: expirationDate },
});
return expirationDate;
}
console.error("Error watching inbox", res.data);
console.error("Error watching inbox", res);
}

async function unwatch(gmail: gmail_v1.Gmail) {
console.log("Unwatching emails");
await gmail.users.stop({ userId: "me" });
await unwatchUser(gmail);
}

export async function unwatchEmails({
Expand Down
4 changes: 2 additions & 2 deletions apps/web/app/api/google/webhook/process-history.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import { categorizeSender } from "@/utils/categorize/senders/categorize";
import { unwatchEmails } from "@/app/api/google/watch/controller";
import { createScopedLogger } from "@/utils/logger";
import { markMessageAsProcessing } from "@/utils/redis/message-processing";
import { getHistory } from "@/utils/gmail/misc";

const logger = createScopedLogger("Process History");

Expand Down Expand Up @@ -153,8 +154,7 @@ export async function processHistoryForUser(
email,
});

const history = await gmail.users.history.list({
userId: "me",
const history = await getHistory(gmail, {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Pagination not implemented in getHistory function

The current implementation of getHistory doesn't handle pagination for large result sets. It makes a single request to Gmail API and returns the response directly without checking for or processing nextPageToken. This could lead to incomplete history retrieval for heavily backlogged accounts.

Key issues:

  • No handling of nextPageToken from Gmail API response
  • Single request limited to maxResults: 500 in the calling code
  • No aggregation of results across multiple pages
🔗 Analysis chain

Check long history retrieval edge cases.
Wrapping getHistory within a try-catch is helpful. For heavily backlogged accounts, ensure we handle large volumes gracefully (e.g., by paginating multiple times).


Let me check the implementation of getHistory to see if it handles pagination.


Let me try a different approach to check the implementation.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check if multiple pages of history are handled consistently
rg -A 10 'getHistory\('

Length of output: 1539


Script:

#!/bin/bash
# Check the full implementation of getHistory function
ast-grep --pattern 'export async function getHistory($_) {
  $$$
}'

Length of output: 69


Script:

#!/bin/bash
# Check the full implementation of getHistory function using ripgrep
rg -U "export async function getHistory[\s\S]*?\n}" --multiline

Length of output: 1040

// NOTE this can cause problems if we're way behind
// NOTE this doesn't include startHistoryId in the results
startHistoryId,
Expand Down
10 changes: 5 additions & 5 deletions apps/web/app/api/user/assess/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import {
UNREAD_LABEL_ID,
} from "@/utils/gmail/label";
import { SafeError } from "@/utils/error";
import { getFiltersList } from "@/utils/gmail/filter";
import { getForwardingAddresses } from "@/utils/gmail/misc";

export type AssessUserResponse = Awaited<ReturnType<typeof assessUser>>;

Expand Down Expand Up @@ -99,17 +101,15 @@ async function getLabelCount(gmail: gmail_v1.Gmail) {
}

async function getFiltersCount(gmail: gmail_v1.Gmail) {
const res = await gmail.users.settings.filters.list({ userId: "me" });
const res = await getFiltersList({ gmail });
const filters = res.data.filter || [];
return filters.length;
}

async function getForwardingAddressesCount(gmail: gmail_v1.Gmail) {
try {
const res = await gmail.users.settings.forwardingAddresses.list({
userId: "me",
});
return res.data.forwardingAddresses?.length || 0;
const res = await getForwardingAddresses(gmail);
return res.forwardingAddresses?.length || 0;
} catch (error) {
// Can happen due to "Forwarding features disabled by administrator"
console.error("Error getting forwarding addresses", error);
Expand Down
20 changes: 8 additions & 12 deletions apps/web/app/api/user/bulk-archive/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@ import { getGmailClient } from "@/utils/gmail/client";
import { INBOX_LABEL_ID, getOrCreateInboxZeroLabel } from "@/utils/gmail/label";
import { sleep } from "@/utils/sleep";
import { withError } from "@/utils/middleware";
import { getThreads, modifyThread } from "@/utils/gmail/thread";

const bulkArchiveBody = z.object({ daysAgo: z.string() });
export type BulkArchiveBody = z.infer<typeof bulkArchiveBody>;
export type BulkArchiveResponse = Awaited<ReturnType<typeof bulkArchive>>;

async function bulkArchive(body: BulkArchiveBody, gmail: gmail_v1.Gmail) {
const res = await gmail.users.threads.list({
userId: "me",
const res = await getThreads(gmail, {
maxResults: 500,
q: `older_than:${body.daysAgo}d`,
labelIds: [INBOX_LABEL_ID],
});

console.log(`Archiving ${res.data.threads?.length} threads`);
console.log(`Archiving ${res.threads?.length} threads`);

const archivedLabel = await getOrCreateInboxZeroLabel({
gmail,
Expand All @@ -29,22 +29,18 @@ async function bulkArchive(body: BulkArchiveBody, gmail: gmail_v1.Gmail) {
if (!archivedLabel.id)
throw new Error("Failed to get or create archived label");

for (const thread of res.data.threads || []) {
await gmail.users.threads.modify({
userId: "me",
id: thread.id!,
requestBody: {
addLabelIds: [archivedLabel.id],
removeLabelIds: [INBOX_LABEL_ID],
},
for (const thread of res.threads || []) {
await modifyThread(gmail, thread.id!, {
addLabelIds: [archivedLabel.id],
removeLabelIds: [INBOX_LABEL_ID],
Comment on lines +32 to +35
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider parallel archiving in small batches.
Archiving threads one-by-one with a sleep(40) call is straightforward but can be slow for large numbers of threads. Batching multiple modifications before sleeping can improve throughput.

});

// we're allowed to archive 250/10 = 25 threads per second:
// https://developers.google.com/gmail/api/reference/quota
await sleep(40); // 1s / 25 = 40ms
}

return { count: res.data.threads?.length || 0 };
return { count: res.threads?.length || 0 };
}

export const POST = withError(async (request: Request) => {
Expand Down
11 changes: 5 additions & 6 deletions apps/web/app/api/user/group/[groupId]/messages/controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import prisma from "@/utils/prisma";
import type { gmail_v1 } from "@googleapis/gmail";
import { createHash } from "node:crypto";
import groupBy from "lodash/groupBy";
import { getMessage } from "@/utils/gmail/message";
import { getMessage, getMessages } from "@/utils/gmail/message";
import { findMatchingGroupItem } from "@/utils/group/find-matching-group";
import { parseMessage } from "@/utils/mail";
import { extractEmailAddress } from "@/utils/email";
Expand Down Expand Up @@ -214,15 +214,14 @@ async function fetchGroupMessages(
): Promise<{ messages: MessageWithGroupItem[]; nextPageToken?: string }> {
const q = buildQuery(groupItemType, groupItems, from, to);

const response = await gmail.users.messages.list({
userId: "me",
const response = await getMessages(gmail, {
maxResults,
pageToken,
q,
query: q,
});

const messages = await Promise.all(
(response.data.messages || []).map(async (message) => {
(response.messages || []).map(async (message) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Limit concurrency in Promise.all loop.
When processing large numbers of messages, Promise.all can cause excessive concurrent requests. Consider introducing concurrency limits (e.g., with libraries like p-limit) or batching to prevent potential performance issues and rate limits.

- const messages = await Promise.all(
-   (response.messages || []).map(async (message) => {
-     ...
-   }),
- );
+ import pLimit from "p-limit";
+ const limit = pLimit(5); // limit concurrency to 5
+ const messages = await Promise.all(
+   (response.messages || []).map((msg) => limit(async () => {
+       ...
+   })),
+ );

Committable suggestion skipped: line range outside the PR's diff.

const m = await getMessage(message.id!, gmail);
const parsedMessage = parseMessage(m);
const matchingGroupItem = findMatchingGroupItem(
Expand All @@ -236,7 +235,7 @@ async function fetchGroupMessages(
return {
// search might include messages that don't match the rule, so we filter those out
messages: messages.filter((message) => message.matchingGroupItem),
nextPageToken: response.data.nextPageToken || undefined,
nextPageToken: response.nextPageToken || undefined,
};
}

Expand Down
8 changes: 2 additions & 6 deletions apps/web/app/api/user/no-reply/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { getGmailClient } from "@/utils/gmail/client";
import { type MessageWithPayload, isDefined } from "@/utils/types";
import { parseMessage } from "@/utils/mail";
import { withError } from "@/utils/middleware";
import { getThread } from "@/utils/gmail/thread";

export type NoReplyResponse = Awaited<ReturnType<typeof getNoReply>>;

Expand All @@ -18,12 +19,7 @@ async function getNoReply(options: { email: string; gmail: gmail_v1.Gmail }) {
const sentEmailsWithThreads = (
await Promise.all(
sentEmails.data.messages?.map(async (message) => {
const thread = (
await options.gmail.users.threads.get({
userId: "me",
id: message.threadId!,
})
).data;
const thread = await getThread(message.threadId!, options.gmail);

const lastMessage = thread.messages?.[thread.messages?.length - 1];
const lastMessageFrom = lastMessage?.payload?.headers?.find(
Expand Down
9 changes: 4 additions & 5 deletions apps/web/app/api/user/rules/[id]/example/controller.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { gmail_v1 } from "@googleapis/gmail";
import { parseMessage } from "@/utils/mail";
import { getMessage } from "@/utils/gmail/message";
import { getMessage, getMessages } from "@/utils/gmail/message";
import type {
MessageWithGroupItem,
RuleWithGroup,
Expand Down Expand Up @@ -66,14 +66,13 @@ async function fetchStaticExampleMessages(
q += `subject:${rule.subject} `;
}

const response = await gmail.users.messages.list({
userId: "me",
const response = await getMessages(gmail, {
maxResults: 50,
q,
query: q,
Comment on lines +69 to +71
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

The review comment is valid - pagination support should be implemented

Based on the code analysis:

  1. The getMessages function in utils/gmail/message.ts accepts a pageToken parameter and returns paginated results from the Gmail API
  2. The codebase already has pagination handling in other places:
    • fetchPaginatedMessages is used for group-related message fetching
    • The /api/google/messages/route.ts endpoint handles nextPageToken

However, in the current implementation at apps/web/app/api/user/rules/[id]/example/controller.ts, the pagination support is not utilized when fetching static example messages, potentially missing relevant messages beyond the first 50 results.

🔗 Analysis chain

Consider handling paginated results.

While fetching 50 messages might suffice for now, getMessages may support pagination. Consider handling the returned nextPageToken to fetch subsequent pages when necessary.


Let me search for more context about the pagination handling in the codebase.


Let me search for more specific information about the getMessages implementation.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Searching usage for `fetchStaticExampleMessages` to confirm it does not require subsequent pages
rg -A 4 "fetchStaticExampleMessages"

Length of output: 901


Script:

#!/bin/bash
# Search for fetchPaginatedMessages implementation and usage
rg -A 10 "fetchPaginatedMessages"

# Search for getMessages implementation to understand pagination support
ast-grep --pattern 'async function getMessages'

Length of output: 3520


Script:

#!/bin/bash
# Search for getMessages implementation and usage
rg -A 15 "getMessages = async|function getMessages"

# Search for files that might contain Gmail API related code
fd -e ts gmail

Length of output: 3111

});

const messages = await Promise.all(
(response.data.messages || []).map(async (message) => {
(response.messages || []).map(async (message) => {
const m = await getMessage(message.id!, gmail);
const parsedMessage = parseMessage(m);
return parsedMessage;
Expand Down
9 changes: 4 additions & 5 deletions apps/web/app/api/user/stats/recipients/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import type { gmail_v1 } from "@googleapis/gmail";
import { auth } from "@/app/api/auth/[...nextauth]/auth";
// import { getGmailClient } from "@/utils/gmail/client";
import { parseMessage } from "@/utils/mail";
import { getMessage } from "@/utils/gmail/message";
import { getMessage, getMessages } from "@/utils/gmail/message";
import {
getDomainsMostSentTo,
getMostSentTo,
Expand All @@ -26,15 +26,14 @@ export type RecipientsResponse = Awaited<ReturnType<typeof getRecipients>>;
async function getRecipients(options: { gmail: gmail_v1.Gmail }) {
const { gmail } = options;

const res = await gmail.users.messages.list({
userId: "me",
q: "in:sent",
const res = await getMessages(gmail, {
query: "in:sent",
maxResults: 50,
});

// be careful of rate limiting here
const messages = await Promise.all(
res.data.messages?.map(async (m) => {
res.messages?.map(async (m) => {
const message = await getMessage(m.id!, gmail);
return parseMessage(message);
}) || [],
Expand Down
Loading