LLM App Tutorial — Building Your Own Auto Email Follow-Up
A python tutorial to build your first practical but simple LLM tool
Problem
Throughout my job-hunting process, I found following up on emails very painful for outreach and recruiting. You need to be respectful but not pushy. Following up in 1 day looks desperate, but following up in 5 days is too late. The thought of it gives me a BIG headache and I end up procrastinating a lot. When I finally feel ready, the position has already been filled due to the competitive US job market.
Following up with people, especially recruiters, is so painful! You need to be respectful, not pushy, but also hit the point). Thinking of it gives me a headache and I procrastinate so much. When I was finally ready to send, the position is already filled In the crazy US job market. So I use LLM to directly follow up for me if people don’t respond in three days!
Solution
I built an auto-email follow-up solution with Eugene Chan. If an email you sent to recruiters or outreach hasn’t been responded to for three days, GPT will generate a follow-up draft for you.
Features
An Email label to filter recruiters vs non-recruiters
For labeled emails that haven’t been replied to for three days, a GPT draft a follow-email based on your prior conversation.
The draft sends it to your inbox to remind you to follow up, and you can customize it yourself before sending it.
How we did do it?
The flowchart below describes our entire process. It can be divided into the following three parts.
Python as the main logic flow
AWS lambda as backend flow
Zapier for Google Authentication and Daily Event Triggering
First, we use Python to create the main logic flow, ensuring that we can send the email from our local machine and that the output draft from GPT is correct. Second, we use AWS Lambda to wrap this Python script as our backend. When an event is triggered, AWS Lambda executes the Python process. Lastly, we use Zapier to handle Gmail authentication and set 9 am as the event trigger time for our AWS Lambda function.
Feel free to review our github to learn more about it.
Python as the main logic flow
Step 1: Import libraries
from googleapiclient.discovery import build
import json
import base64
# from lxml import etree
from datetime import datetime, timedelta
import json
import pandas as pd
import openai
import re
import os
from googleapiclient.errors import HttpError
from email.mime.text import MIMEText
from base64 import urlsafe_b64encode
from dotenv import load_dotenv
# FOR AWS
openai.api_key = os.environ['OPENAI_KEY']
Step 2: Fetch all emails and find those with the “Follow-up” label
To test the application locally, you will need to fetch the Gmail API and connect it as a service. This process is included in our app.py code for authentication with Zapier and execution with AWS Lambda. Therefore, we have not included it in this logical flow. However, for local testing, you can follow this tutorial to set up the Gmail API.
def find_all_messages(service, follow_up_label_id):
# request a list of all the messages
# We can also pass maxResults to get any number of emails. Like this:
result = service.users().messages().list(
userId='me', labelIds=['SENT', follow_up_label_id]).execute()
messages = result.get('messages')
return messages
# find the follow up labels
def find_follow_up_label(service):
try:
labels_results = service.users().labels().list(userId='me').execute()
labels = labels_results.get('labels', [])
print('labels', labels) # #store all ids with follow-up label
follow_up_label_id = None
for label in labels:
if label['name'] == 'Follow-up':
follow_up_label_id = label['id']
return follow_up_label_id
except Exception as e:
print(e)
return None
Step 3: Check if you are the sender of the last thread
Here, we can view all the threads along with their IDs and determine if we are the sender of the most recent thread.
def get_thread_and_id(service, target_text):
# find the thread for this email
thread_id = target_text['threadId']
thread = service.users().threads().get(userId='me', id=thread_id).execute()
return thread, thread_id
def check_sender_of_last_thread(thread):
if 'messages' in thread:
last_message = thread['messages'][-1]
if 'payload' in last_message:
payload = last_message['payload']
headers = payload['headers']
for header in headers:
if header['name'] == 'From':
sender = header['value']
return sender
return None
Step 4: Check if the email has not been responded to for 3+ days.
We are only computing the date at this point. After we gather all the necessary information for the next step, we will update the filter in 3 days.
def get_subject_sender_receiver_date(headers, target_text):
for d in headers:
if d['name'] == 'Subject':
subject = d['value']
if d['name'] == 'From':
sender = d['value']
if d['name'] == 'To' or d['name'] == 'Delivered-To':
receiver = d['value'] internal_date = target_text['internalDate']
sent_time = datetime.fromtimestamp(
int(internal_date) / 1000).strftime('%Y-%m-%d %H:%M:%S')
return subject, sender, receiver, sent_time
Step 5: Gather all the information of those emails you want to follow up
The get_body
function decodes encrypted emails to text. The not_replied_emails
function gathers all the necessary information for follow-up, including the message ID, subject, thread ID, receiver, sent time, and body.
def get_body(payload):
'''Find Email body'''
# The Body of the message is in Encrypted format. So, we have to decode it.
# Get the data and decode it with base 64 decoder.
parts = payload.get('parts')[0]
# non pure text
if 'multipart' in parts['mimeType']:
for part in parts['parts']:
if part['mimeType'] == 'text/plain':
data = part['body']['data']
# pure text
else:
data = parts['body']['data'] data = data.replace("-", "+").replace("_", "/")
body = base64.b64decode(data).decode('utf-8')
return bodydef not_replied_emails(service, follow_up_label_id):
'''Collect the email that we want to follow up in a data frame''' messages = find_all_messages(service, follow_up_label_id)
if (messages is None):
return None
# output data storage
df = pd.DataFrame(columns=['msgId', 'subject', 'thread_id',
'sender', 'receiver', 'sent_time', 'body'])
# iterate through all the messages
thread_ids = set() for msg in messages:
# Get the message from its id
txt = service.users().messages().get(
userId='me', id=msg['id']).execute()
# print("txt['labelIds']", txt['labelIds'])
target_text = txt
thread, thread_id = get_thread_and_id(service, target_text)
last_sender = check_sender_of_last_thread(thread) # Use try-except to avoid any Errors
try:
# Get value of 'payload' from dictionary 'target_text'
payload = target_text['payload']
headers = payload['headers'] subject, sender, receiver, sent_time = get_subject_sender_receiver_date(
headers, target_text) # check if the email is responded or not by seeing the last sender
# and we haven't checked this thread yet
if (last_sender == sender) and (thread_id not in thread_ids):
thread_ids.add(thread_id)
body = get_body(payload) new_row = {'msgId': msg['id'], 'subject': subject, 'thread_id': thread_id, 'sender': sender,
'receiver': receiver, 'sent_time': sent_time, 'body': body}
df.loc[len(df)] = new_row except Exception as e:
print('Error Occured: ', e)
pass # save as a json data
df_dict = df.to_dict(orient='records')
df['sent_time'] = df['sent_time'].astype(str)
json_data = json.dumps(df_dict) return json_data
Step 6: Draft a follow-up with GPT using Openai API
We perform data cleaning and generate responses using the Openai API.
def data_cleaning(json_str):
# Convert the string to a DataFrame
df = pd.read_json(json_str, orient='records')
# filter dataframe that has been three days since the email was sent
df['sent_time'] = pd.to_datetime(df['sent_time']) # Calculate threshold date (current date - 3 days)
threshold_date = datetime.now().date() - timedelta(days=3) # Filter the dataframe based on the condition
df = df[df['sent_time'].dt.date <= threshold_date] # Apply the function on the 'email_string' column
df['receiver'] = df['receiver'].apply(extract_email) # %%
# add an empty column to the dataframe
df['reply'] = '' return dfdef extract_email(string):
# Define the regex pattern
pattern = r'[\\w\\.-]+@[\\w\\.-]+'
matches = re.findall(pattern, string)
if matches:
return matches[0]
else:
return ""def openai_prompt_response(clean_body, receiver, subject): try:
prompt = "I wrote this email: " + clean_body + "\\n" + \\
"Can you write a follow-up email for this email I wrote? I don't need a subject, and the email should be less than 100 words, and every sentence should be complete. The email should include the phrase, follow up, in the email body."
model = "text-davinci-003"
response = openai.Completion.create(
engine=model, prompt=prompt, max_tokens=100) generated_text = response.choices[0].text
generated_formatted = "This is a reminder to send a follow-up email to " + receiver + ".\\n" "The email you wrote previously has the subject of: " + subject + "\\n\\nHere is the drafted follow up for you 😉\\n"+ generated_text return generated_formatted
except Exception as e:
print('Error Occured: ', e)
return Nonedef delete_old_thread(input_text):
pattern = r'On [\\w\\s,]+ at [\\d:\\s]+[APM]+ [\\w\\s]+ <[\\w.-]+@[\\w.-]+>'
match = re.search(pattern, input_text, re.IGNORECASE)
if match:
return input_text[:match.start()]
else:
return input_textdef generate_reply(json_str):
df = data_cleaning(json_str)
for index, row in df.iterrows():
msgId = row['msgId']
subject = row['subject']
sender = row['sender']
receiver = row['receiver']
sent_time = row['sent_time']
body = row['body'] clean_body = delete_old_thread(body) response = openai_prompt_response(body, receiver, subject) df.loc[index, 'reply'] = response
df['sent_time'] = df['sent_time'].astype(str)
df_dict = df.to_dict(orient='records')
openai_json = json.dumps(df_dict) return openai_json
Step 7: Send the draft email to yourself
We look through all the emails, sending them one by one.
def send_one_email(service, sender, receiver, subject, message):
# Create an email message
email = MIMEText(message)
email['to'] = receiver
email['from'] = sender
email['subject'] = subject
# Encode the email content
raw_email = urlsafe_b64encode(email.as_bytes()).decode('utf-8') # Send the email
try:
message = service.users().messages().send(
userId='me', body={'raw': raw_email}).execute()
print('One email sent successfully!')
return message
except HttpError as error:
print('An error occurred while sending the email:', error)
return None# -- Main Function for Sending Email -- #
def send_email_to_all(service, openai_json, email_address):
df = pd.read_json(openai_json, orient='records')
email_list = [] for i in range(len(df)):
sender = email_address
real_receiver = df['receiver'][i]
receiver = email_address subject = '-- Follow up reminder -- ' + \\
df['subject'][i] + ' -- For ' + real_receiver + ' ---'
print(subject)
message = df['reply'][i]
message = send_one_email(service, sender, receiver, subject, message)
email_list.append({'id': df['msgId'][i], 'message': message}) return email_list
AWS lambda as backend flow
Auto Follow-up consists of two components: a Zapier custom integration that serves as the interface for authentication and cron job orchestration, and a serverless function that handles the follow-up draft generation and sends reminders.
Our Python logic flow is administered through AWS Lambda. The code we execute on Lambda is referred to as a “Lambda function.” Lambda can automatically trigger functions (such as sending emails) when a specified event occurs (for example, Zapier’s daily checking time). If you are unfamiliar with AWS Lambda, you can check out an introduction here.
Building a serverless API with AWS
AWS offers a serverless application model (SAM) that simplifies the deployment of serverless code as Lambda functions and enables it to be served through APIs with the Amazon API Gateway. AWS’s serverless architecture includes a tool called SAM-CLI, which automates the compilation and deployment of serverless functions to an AWS cluster.
Our script for fetching all email threads that need a reply, generating a follow-up draft, and sending reminders to the user’s email lives in a single Python script that is wrapped as a Lambda function.
The AWS Serverless Application Model helps us create an API endpoint that receives requests and sends responses on behalf of the Lambda function. It ensures that our Lambda function receives the payload from the API request and sends the response back to the user through the API.
All the integration between the Lambda function and API Gateway is specified in the template.yaml
file for the AWS Serverless Application Model.
GenerateFollowUpFunction:
Type: AWS::Serverless::Function
Properties:
Handler: app.generate_follow_up_handler
Runtime: python3.9
Architectures:
- x86_64
Events:
GenerateFollowUp:
Type: Api # specify function as API will create an API endpoint with
# AWS API Gateway
Properties:
Path: /generate-follow-up
Method: post
Once the function is set up as an API, an endpoint will be created. This is done when the Serverless Application is deployed with SAM-CLI. All request payloads can be retrieved in the Lambda function using the event['body']
object.
The challenging part is passing the response from the Lambda function through the API Gateway and back to the Zapier custom integration. The API Gateway has a specific request format that needs to be followed. If the format is not followed, the API Gateway will not pass the response back to the client, which in our case is the Zapier custom integration.
{
"isBase64Encoded": false,
"statusCode": [[status code]],
"body": [[string body]],
"headers": {
"content-type": "application/json"
}
}
Zapier for Google Authentication and Daily Event Triggering
Scheduling the reminder every day
To automatically check and generate follow-up reminders every day, we initially built a cron job worker. However, we ended up using Zapier’s custom integration platform. Zapier is a no-code workflow tool that allows people to integrate apps and chain them into automatic workflows with an “If-this-then-that” structure.
While a cron job worker may seem like the most obvious choice, we ultimately chose to build our generation on Zapier for the following reasons:
We needed to instantiate and orchestrate a cron job worker for each user (specifically, each Gmail account).
We needed to create a UI for users to authenticate with their Gmail account and grant us access to their sent emails.
We also needed to store the Gmail tokens for each user and handle token refresh, as users would not log in with Google every day when the cron job runs.
Zapier turned out to be the best solution for running cron jobs for multiple users. It handles authentication and refreshing of Gmail credentials, has a scheduler that runs the cron job for us, and provides an intuitive UI for users to set it up.
How did it work?
To connect these two components, we expose an API from the serverless function to the Zapier custom integration. When the Zapier integration is triggered (e.g. daily, weekly), it calls the API to check for emails that need follow-up. Then, it sends a reminder and a drafted follow-up message from GPT back to the user’s email.
Although this appears to be a normal API flow, there are several challenges in connecting Zapier to the serverless API function (i.e., an AWS Lambda function):
Zapier has a hard cap of 30 seconds for timeouts. However, depending on the follow-up items that need to be drafted and reminded, this often exceeds 30 seconds.
The AWS Lambda function does not automatically expose itself as an API. It needs to be wrapped in an API gateway to handle all the REST API requests and responses.
Working within Zapier’s hard timeout
Zapier provides an “escape hatch” for custom integrations that may take more than 30 seconds to give a proper response. This method is called performResume
. It allows Zapier integration to call an external API server and parse a callback URL for the external server to use to callback once the long operation is completed.
While Zapier is waiting, the workflow remains in a pending state (similar to Promise
in JavaScript). When the external server calls back the callback URL, Zapier triggers the performResume
function defined for the custom integration to complete the workflow.
In our case, since generating and sending follow-up reminders may take more than 30 seconds, when Zapier first calls our generate follow-up reminder API server, along with other required payload, it generates and passes a callback URL.
Once the generation and sending of reminders are complete, our backend posts a request to the callback URL that Zapier provided and triggers the performResume
function, which returns the email reminder sent by our backend
Results
I shared this tool on LinkedIn, helping out 50+ friends and connections! 🎉