-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlambda_function.py
More file actions
1660 lines (1381 loc) · 61.9 KB
/
lambda_function.py
File metadata and controls
1660 lines (1381 loc) · 61.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""
AWS Lambda function for email automation with SES, S3, and Backend API.
Processes incoming emails, performs lead qualification, and sends automated replies.
This version uses the backend API instead of direct MongoDB connections.
"""
import json
import os
import base64
import boto3
import email
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from typing import Dict, List, Literal, Optional, Any, Union, Tuple
from datetime import datetime, timezone
from dataclasses import dataclass
import re
# Third-party imports
from pydantic import BaseModel, Field
from enum import Enum
from agents import Agent, Runner
# API client
from api_client import (
APIClient,
create_api_client,
convert_lead_to_api_format,
convert_message_to_api_format,
convert_deal_to_api_format,
)
from database.models.user import User, UserPreferences, PartnershipPreferences
from database.models.lead import (
LeadQualificationData,
LeadStatus,
LeadDealState,
ConversationState,
)
from database.models.deal import (
Deal,
DealStatus,
DealType,
DueDateType,
Deliverable,
Contact,
Brief,
)
# Environment variables
S3_BUCKET = os.environ.get(
"S3_BUCKET"
)
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
SES_REGION = os.environ.get("SES_REGION", "us-east-2")
# AWS clients
s3_client = boto3.client("s3")
ses_client = boto3.client("ses", region_name=SES_REGION)
# =============================
# Deal/Conversation FSM Helpers
# =============================
def _next_conversation_state_after_parse(lead: LeadQualificationData) -> ConversationState:
"""Decide which conversation state to move to based on lead completeness."""
missing = set(lead.get_missing_fields())
# Prioritize by most critical blockers
if "Budget" in missing:
return ConversationState.AWAITING_BUDGET
if "Deliverables" in missing:
return ConversationState.AWAITING_DELIVERABLES
if {"Brand Name", "Brand Website", "Brand Category", "Brand Contact Name", "Brand Contact Email"} & missing:
return ConversationState.AWAITING_DETAILS
# No critical blockers -> proceed to negotiation/finalization decision later
return ConversationState.NEGOTIATING_TERMS
async def run_deal_conversation_fsm(
api_client: APIClient,
user: User,
conversation_history: str,
sender_email: str,
thread_id: str,
) -> "QAPolisherResponse" | str | None:
"""Compact FSM controlling deal + conversation states per lead/thread.
Returns either a polished response (QAPolisherResponse), a plain string response,
or None (no reply to send).
"""
# 1) Ensure or create lead for this thread (classification gates this below)
existing_lead = await get_lead_by_thread_id(api_client, thread_id)
# If no existing lead, classify first to decide if we should even create a lead
if not existing_lead:
filter_input = f"""
CONVERSATION HISTORY:
{conversation_history}
Classify this email conversation into one of the three categories and provide the appropriate subcategory.
"""
filter_result = await Runner.run(deal_filter_agent, filter_input)
filter_response = filter_result.final_output_as(DealFilterResponse)
if filter_response.classification != "Sponsorship" or filter_response.confidence < 0.75:
# Non-sponsorship → standard response
return (
"Thank you for reaching out! This message appears to be outside the scope of business partnership inquiries that I can assist with. If you have a collaboration or partnership proposal, please feel free to send those details and I'll be happy to help connect you with the right person."
)
# Create initial lead with default FSM states
created_lead = await create_lead(api_client, sender_email, thread_id, user.uuid)
if not created_lead:
return "I apologize, but there was an issue processing your request. Please try again."
existing_lead = created_lead
# Normalize into model
lead_model = LeadQualificationData.model_validate(existing_lead)
# 2) Conversation state selection and agent invocation
# For NEW_INQUIRY/INITIAL_OUTREACH, parse and gather info first
if lead_model.deal_state in {LeadDealState.NEW_INQUIRY, LeadDealState.INFO_GATHERING}:
parser_input = f"""
EXISTING LEAD INFORMATION, ANY FIELD THAT IS RELATED TO LEAD QUALIFICATION AND IS NULL IS UNKNOWN:
{existing_lead}
CONVERSATION HISTORY:
{conversation_history}
Extract and update comprehensive structured deal information from this sponsorship opportunity conversation.
Build upon existing information and extract ALL possible fields mentioned or implied.
"""
parser_result = await Runner.run(deal_parser_agent, parser_input)
parser_response = parser_result.final_output_as(LeadQualificationData)
# Update lead with parsed information
updated_lead_dict = await update_lead(api_client, existing_lead.get("_id"), thread_id, parser_response)
if not updated_lead_dict:
return "I apologize, but there was an issue processing your request. Please try again."
lead_model = LeadQualificationData.model_validate(updated_lead_dict)
# Transition conversation state based on missing fields
next_conv = _next_conversation_state_after_parse(lead_model)
new_deal_state = LeadDealState.INFO_GATHERING if next_conv in {
ConversationState.AWAITING_DETAILS,
ConversationState.AWAITING_BUDGET,
ConversationState.AWAITING_DELIVERABLES,
} else LeadDealState.EVALUATING
# Persist FSM states
lead_model.conversation_state = next_conv
lead_model.deal_state = new_deal_state
await update_lead(api_client, existing_lead.get("_id"), thread_id, lead_model)
# Prepare targeted prompt asking for missing info (if needed)
if new_deal_state == LeadDealState.INFO_GATHERING:
missing = ", ".join(lead_model.get_missing_fields())
draft = (
f"Thanks for reaching out! To move forward, could you please share: {missing}? This will help us confirm scope and next steps."
)
polisher_input = f"""
DRAFT RESPONSE TO POLISH:
{draft}
CONTEXT:
- Recommendation: CONTINUE
- Tone: professional
- Key Messages: requesting missing details
REQUIREMENTS:
- Keep under 125 words
- Maintain the professional tone
- Ensure professional talent agent voice
- No email signature at the end of the response
"""
polished = await Runner.run(qa_polisher_agent, polisher_input)
return polished.final_output_as(QAPolisherResponse)
# 3) Evaluation / Negotiation phase
if lead_model.deal_state in {LeadDealState.EVALUATING, LeadDealState.NEGOTIATING}:
# Build preference evaluation input
creator_name = user.profile.name or "our creator"
prefs = user.preferences
platform_context = ""
if user.platforms:
primary_platforms = [
p.platformType.value.title() for p in user.platforms[:3] if p.isActive
]
if primary_platforms:
platform_context = f"Primary Platforms: {', '.join(primary_platforms)}"
preference_input = f"""
COMPREHENSIVE LEAD INFORMATION:
{lead_model.model_dump_json(indent=2)}
MISSING FIELDS:
{', '.join(lead_model.get_missing_fields())}
CREATOR PREFERENCES:
- Name: {creator_name}
- Minimum Rate: ${prefs.absoluteMinimumRate}
- Auto-Reject Categories: {', '.join(prefs.autoRejectCategories)}
- Partnership Types: Flat Rate: {prefs.partnershipTypes.flatRate}, Affiliate: {prefs.partnershipTypes.affiliate}, Performance Hybrid: {prefs.partnershipTypes.performanceHybrid}
- {platform_context}
CONVERSATION HISTORY:
{conversation_history}
"""
pref_result = await Runner.run(comprehensive_preference_agent, preference_input)
pref = pref_result.final_output_as(PreferenceEvaluationDraftResponse)
# Map recommendation to deal state and actions
if pref.recommendation == "REJECT":
lead_model.deal_state = LeadDealState.REJECTED
lead_model.conversation_state = ConversationState.CLOSED
await update_lead(api_client, existing_lead.get("_id"), thread_id, lead_model)
# Send polite decline
polisher_input = f"""
DRAFT RESPONSE TO POLISH:
{pref.draft_response}
CONTEXT:
- Recommendation: REJECT
- Tone: {pref.communication_tone}
- Key Messages: {', '.join(pref.key_messages)}
"""
polished = await Runner.run(qa_polisher_agent, polisher_input)
return polished.final_output_as(QAPolisherResponse)
if pref.recommendation == "NEGOTIATE":
lead_model.deal_state = LeadDealState.NEGOTIATING
lead_model.conversation_state = ConversationState.NEGOTIATING_TERMS
await update_lead(api_client, existing_lead.get("_id"), thread_id, lead_model)
polisher_input = f"""
DRAFT RESPONSE TO POLISH:
{pref.draft_response}
CONTEXT:
- Recommendation: NEGOTIATE
- Tone: {pref.communication_tone}
- Key Messages: {', '.join(pref.key_messages)}
"""
polished = await Runner.run(qa_polisher_agent, polisher_input)
return polished.final_output_as(QAPolisherResponse)
# CONTINUE → if qualifies, create deal and finalize; else ask for missing info
qualified = check_qualification_criteria(lead_model, user.preferences)
if qualified:
deal_id = await create_deal_from_qualified_lead(api_client, lead_model)
if deal_id:
# Ensure conversation exists for this deal and thread
try:
await api_client.create_conversation_for_deal(deal_id, thread_id)
except Exception:
pass
lead_model.deal_state = LeadDealState.ACCEPTED
lead_model.conversation_state = ConversationState.FINALIZING
await update_lead(api_client, existing_lead.get("_id"), thread_id, lead_model)
return "Thanks for working with Repflow! We'll get back to you soon."
# Not yet qualified → request missing info
missing = ", ".join(lead_model.get_missing_fields())
draft = (
f"Appreciate the details so far. To keep things moving, could you confirm: {missing}? This will let us lock scope and next steps."
)
polisher_input = f"""
DRAFT RESPONSE TO POLISH:
{draft}
CONTEXT:
- Recommendation: CONTINUE
- Tone: professional
- Key Messages: requesting missing details
"""
polished = await Runner.run(qa_polisher_agent, polisher_input)
return polished.final_output_as(QAPolisherResponse)
# Default: no action
return None
# Models and Enums (same as original)
class DealFilterResponse(BaseModel):
"""Response from the Deal Filter Agent."""
classification: str = Field(
..., description="Sponsorship | Non-Sponsorship | Other"
)
subcategory: str = Field(
...,
description="Specific subcategory like flat, affiliate, hybrid, gifting, ugc, event, press, recruiting, spam, etc.",
)
confidence: float = Field(
..., description="Confidence score for the classification (0-1)"
)
class PreferenceEvaluationDraftResponse(BaseModel):
"""Expansive response from the Preference Evaluation Agent."""
# Preference evaluation
recommendation: Literal[
"CONTINUE",
"NEGOTIATE",
"REJECT",
] = Field(..., description="CONTINUE, NEGOTIATE, REJECT")
# Communication strategy
communication_tone: str = Field(
default="professional",
description="Recommended tone: professional, warm, firm, etc.",
)
key_messages: List[str] = Field(
default=[], description="Key messages to convey in response"
)
draft_response: str = Field(
..., description="Complete draft response based on recommendation"
)
class QAPolisherResponse(BaseModel):
"""Response from the QA/Talent Agent Polisher."""
polished_reply: str = Field(
..., description="Final polished email text under 125 words"
)
should_send: bool = Field(
default=True, description="Whether the email should be sent"
)
@dataclass
class EmailEvent:
"""Parsed email event from SES."""
message_id: str
thread_id: str
sender_email: str
recipient_email: str
subject: str
content: str
timestamp: datetime
s3_bucket: str
s3_key: str
# Database Operations via API
async def get_lead_by_thread_id(api_client, thread_id: str) -> Optional[Dict]:
"""Get lead qualification data by thread ID."""
try:
lead = await api_client.get_lead_by_thread_id(thread_id)
if lead:
print(f"Retrieved lead with ID: {lead.get('_id')}")
return lead
except Exception as e:
print(f"Error fetching lead by thread_id {thread_id}: {e}")
return None
async def create_lead(
api_client: APIClient, email: str, thread_id: str, userId: str
) -> Optional[Dict]:
"""Create a new lead qualification record."""
try:
now = datetime.now(timezone.utc)
lead_data = {
"email": email,
"thread_id": thread_id,
"userId": userId,
"status": "new",
# FSM initial states
"deal_state": LeadDealState.NEW_INQUIRY.value,
"conversation_state": ConversationState.INITIAL_OUTREACH.value,
"created_at": now,
"updated_at": now,
}
lead_data = convert_lead_to_api_format(lead_data)
lead = await api_client.create_lead(lead_data)
print(f"Created lead with ID: {lead.get('_id')}")
return lead
except Exception as e:
print(f"Error creating lead: {e}")
return None
async def update_lead(
api_client: APIClient, leadID: str, thread_id: str, leadData: LeadQualificationData
) -> dict | None:
"""Update lead qualification data."""
try:
lead_id = leadID
if not lead_id:
print("ERROR: Lead missing ID field")
return None
print(f"Updating lead {lead_id}")
leadData.thread_id = thread_id
# Convert datetime fields
update_data = convert_lead_to_api_format(leadData.model_dump())
updated_lead = await api_client.update_lead(lead_id, update_data)
return updated_lead
except Exception as e:
print(f"Error updating lead: {e}")
return None
async def mark_lead_qualified(api_client, thread_id: str) -> bool:
"""Mark a lead as qualified."""
try:
lead = await api_client.mark_lead_qualified(thread_id)
if lead:
print(f"Marked lead as qualified for thread {thread_id}")
return True
return False
except Exception as e:
print(f"Error marking lead qualified: {e}")
return False
def check_qualification_criteria(
lead: LeadQualificationData, preferences: UserPreferences
) -> bool:
"""
Check if the lead passes qualification criteria for deal creation.
Validates:
1. Required fields for deal creation (brand_name, brand_category, budget, deliverables)
2. Budget meets minimum rate requirement
3. Brand category not in auto-reject list
4. Partnership type compatibility with user preferences
5. Contact information availability
6. Field quality validation (minimum lengths, etc.)
Args:
lead: Dictionary containing lead information
preferences: User preferences for filtering leads
Returns:
bool: True if lead passes all qualification criteria, False otherwise
"""
try:
# 1. Check required fields for deal creation
missing_fields = lead.get_missing_fields()
if len(missing_fields) > 0:
print(f"QUALIFICATION FAILED: Lead is incomplete: {missing_fields}")
return False
# 2. Check budget meets minimum rate requirement
if lead.budget:
budget_value = extract_budget_value(lead.budget)
if budget_value < preferences.absoluteMinimumRate:
print(
f"QUALIFICATION FAILED: Budget ${budget_value} below minimum rate ${preferences.absoluteMinimumRate}"
)
return False
else:
print("QUALIFICATION FAILED: No budget information provided")
return False
# 3. Check against auto-reject categories
brand_category = (lead.brand_category or "").lower()
for reject_category in preferences.autoRejectCategories:
if reject_category.lower() in brand_category:
print(
f"QUALIFICATION FAILED: Brand category '{brand_category}' is in auto-reject list"
)
return False
# 4. Check partnership type compatibility
deal_type = determine_deal_type_from_budget(lead.budget or "")
if not is_partnership_type_accepted(deal_type, preferences.partnershipTypes):
print(
f"QUALIFICATION FAILED: Deal type '{deal_type}' not accepted in partnership preferences"
)
return False
# 5. Validate contact information (at least one contact method required)
has_contact_info = (
lead.brand_contact_name
or lead.brand_contact_email
or lead.email # fallback to lead email
)
if not has_contact_info:
print("QUALIFICATION FAILED: No contact information available")
return False
# 6. Additional validation for critical fields
if lead.brand_name and len(lead.brand_name.strip()) < 2:
print("QUALIFICATION FAILED: Brand name too short")
return False
if lead.deliverables:
for deliverable in lead.deliverables:
if len(deliverable.strip()) < 5:
print("QUALIFICATION FAILED: Deliverables description too short")
return False
print(f"QUALIFICATION PASSED: Lead for {lead.brand_name} meets all criteria")
return True
except Exception as e:
print(f"QUALIFICATION ERROR: Exception during qualification check: {e}")
return False
def extract_budget_value(budget_str: str) -> float:
"""Extract numeric budget value from budget string."""
if not budget_str:
return 0.0
try:
# Remove common currency symbols and formatting
clean_budget = (
budget_str.replace("$", "").replace(",", "").replace("USD", "").strip()
)
# Find all numeric values in the string
budget_numbers = re.findall(r"\d+(?:\.\d{2})?", clean_budget)
if budget_numbers:
# Take the first (usually largest or most prominent) number
return float(budget_numbers[0])
else:
# Look for ranges (e.g., "1000-5000")
range_match = re.search(
r"(\d+(?:\.\d{2})?)\s*[-–]\s*(\d+(?:\.\d{2})?)", clean_budget
)
if range_match:
# Return the lower bound of the range
return float(range_match.group(1))
except (ValueError, AttributeError):
pass
return 0.0
def determine_deal_type_from_budget(budget_str: str) -> str:
"""Determine deal type based on budget description."""
if not budget_str:
return "FLAT_RATE"
budget_lower = budget_str.lower()
if any(term in budget_lower for term in ["affiliate", "commission", "%"]):
return "AFFILIATE"
elif "ugc" in budget_lower:
return "UGC"
elif "sponsored" in budget_lower:
return "SPONSORED_POST"
elif "partnership" in budget_lower:
return "BRAND_PARTNERSHIP"
elif any(term in budget_lower for term in ["revenue", "share"]):
return "REVENUE_SHARE"
elif any(term in budget_lower for term in ["performance", "hybrid"]):
return "PERFORMANCE_HYBRID"
else:
return "FLAT_RATE"
def is_partnership_type_accepted(
deal_type: str, partnership_prefs: PartnershipPreferences
) -> bool:
"""Check if the deal type is accepted based on partnership preferences."""
deal_type_mapping = {
"FLAT_RATE": partnership_prefs.flatRate,
"AFFILIATE": partnership_prefs.affiliate,
"UGC": partnership_prefs.flatRate, # UGC typically falls under flat rate
"SPONSORED_POST": partnership_prefs.flatRate, # Sponsored posts typically flat rate
"BRAND_PARTNERSHIP": partnership_prefs.custom, # Brand partnerships often custom
"REVENUE_SHARE": partnership_prefs.performanceHybrid, # Revenue share is performance-based
"PERFORMANCE_HYBRID": partnership_prefs.performanceHybrid,
}
return deal_type_mapping.get(
deal_type, partnership_prefs.custom
) # Default to custom if unknown
async def create_deal_from_qualified_lead(
api_client, lead: LeadQualificationData
) -> Optional[str]:
"""Create a Deal object from a qualified lead via API."""
try:
# Parse budget to extract numeric value using our helper function
budget_value = extract_budget_value(lead.budget or "")
if budget_value == 0.0:
budget_value = 1000.0 # Default value if no budget found
# Determine deal type based on budget description using our helper function
deal_type_str = determine_deal_type_from_budget(lead.budget or "")
# Convert string to DealType enum
deal_type_mapping = {
"FLAT_RATE": DealType.FLAT_RATE,
"AFFILIATE": DealType.AFFILIATE,
"UGC": DealType.UGC,
"SPONSORED_POST": DealType.SPONSORED_POST,
"BRAND_PARTNERSHIP": DealType.BRAND_PARTNERSHIP,
"REVENUE_SHARE": DealType.REVENUE_SHARE,
}
deal_type = deal_type_mapping.get(deal_type_str, DealType.FLAT_RATE)
# Parse deliverables into structured format
deliverables = []
if lead.deliverables:
for deliverable in lead.deliverables:
deliverable = Deliverable(
contentType="mixed",
text=deliverable,
draftLink=None,
invoiceLink=None,
)
deliverables.append(deliverable)
# Create contact from lead information
contacts = []
if lead.brand_contact_name or lead.brand_contact_email:
# Determine contact type based on lead data
contact_type = lead.contact_type or "Brand Direct"
role = f"Primary Contact ({contact_type})"
contact = Contact(
name=lead.brand_contact_name or "Unknown",
role=role,
email=lead.brand_contact_email,
avatar=None,
phone=None,
)
contacts.append(contact)
# Create brief object (required field)
brief = Brief(
link="https://placeholder.com/brief",
promoCode="PENDING",
)
# Parse deadline
due_date = None
if lead.deadline:
due_date = lead.deadline.date()
# Build comprehensive comments including all lead information
comments_parts = []
if lead.brand_comment:
comments_parts.append(f"Brand Comment: {lead.brand_comment}")
# Add all required lead fields as structured data
lead_info = [
f"Brand: {lead.brand_name or 'Unknown'}",
f"Website: {lead.brand_website or 'Not provided'}",
f"Category: {lead.brand_category or 'Unknown'}",
f"Contact: {lead.brand_contact_name or 'Unknown'} ({lead.brand_contact_email or 'No email'})",
f"Contact Type: {lead.contact_type or 'Unknown'}",
f"Budget: {lead.budget or 'Not specified'}",
f"Deadline: {lead.deadline or 'TBD'}",
f"Deliverables: {lead.deliverables or 'Not specified'}",
]
comments_parts.append("Lead Details: " + " | ".join(lead_info))
comments_parts.append("Source: Email automation")
final_comments = " | ".join(comments_parts)
# Create the Deal object using Pydantic model
deal = Deal(
company=lead.brand_name or "Unknown Company",
title=f"Partnership with {lead.brand_name or 'Unknown Company'}",
status=DealStatus.NEW_OFFER,
dealType=deal_type,
value=budget_value,
valueCurrency="USD",
dueDate=due_date,
dueDateType=DueDateType.REMINDER,
reminderDays=3,
isPriority=False,
isHighValue=budget_value >= 5000,
isAiPaused=False,
dateReceived=datetime.now(timezone.utc).date(),
comments=final_comments,
brief=brief,
deliverables=deliverables,
communicationHistory=[],
keyContacts=contacts,
timeInStage=0,
userId=None,
createdBy="email_automation",
updatedBy="email_automation",
)
# Convert Deal object to dict using model_dump
deal_data = deal.model_dump()
# Create the deal via API
created_deal = await api_client.create_deal(deal_data)
if created_deal:
deal_id = created_deal.get("id")
print(
f"Successfully created deal with ID: {deal_id} for company: {deal.company}"
)
return deal_id
return None
except Exception as e:
print(f"Error creating deal from qualified lead: {e}")
return None
async def was_email_processed(api_client, message_id: str) -> bool:
"""Check if email was already processed."""
try:
return await api_client.check_email_processed(message_id)
except Exception as e:
print(f"Error checking processed email: {e}")
return False
async def mark_email_processed(api_client, message_id: str, thread_id: str) -> bool:
"""Mark email as processed."""
try:
result = await api_client.mark_email_processed(message_id, thread_id)
return result is not None
except Exception as e:
print(f"Error marking email processed: {e}")
return False
async def is_blacklisted(api_client, email: str) -> bool:
"""Check if an email address is blacklisted."""
try:
return await api_client.check_email_blacklisted(email.lower())
except Exception as e:
print(f"Error checking blacklist: {e}")
return False
async def store_message(api_client, message_data: Dict) -> bool:
"""Store a message via API."""
try:
message_data = convert_message_to_api_format(message_data)
result = await api_client.create_simple_message(message_data)
if result:
print(f"Stored message {message_data['message_id']} via API")
return True
return False
except Exception as e:
print(f"Error storing message: {e}")
return False
async def get_conversation_history(
api_client, thread_id: str, limit: int = 50
) -> List[Dict]:
"""Get conversation history for a thread ID, ordered by timestamp."""
try:
messages = await api_client.get_conversation_history(thread_id, limit)
return messages or []
except Exception as e:
print(f"Error fetching conversation history: {e}")
return []
def format_conversation_history(messages: List[Dict]) -> str:
"""Format conversation history for AI context."""
if not messages:
return ""
formatted_history = "CONVERSATION HISTORY:\n"
for i, message in enumerate(messages, 1):
is_current = i == len(messages)
marker = "[CURRENT MESSAGE]" if is_current else f"[MESSAGE {i}]"
formatted_history += f"""
--- {marker} ---
From: {message.get('sender_email')}
To: {message.get('recipient_email')}
Subject: {message.get('subject')}
Date: {message.get('timestamp')}
Direction: {message.get('direction', '').upper()}
{message.get('content')}
"""
return formatted_history
# Email Processing Functions (same as original)
def extract_email_address(from_header: str) -> str:
"""Extract clean email address from 'From' header."""
email_match = re.search(r"<([^>]+)>|([^\s<>]+@[^\s<>]+)", from_header)
if email_match:
return email_match.group(1) or email_match.group(2)
return from_header.strip()
def normalize_subject_for_thread(subject: str) -> str:
"""Normalize email subject for thread_id generation by removing 'Re:' prefixes."""
if not subject:
return ""
# Remove common reply prefixes (case insensitive)
normalized = re.sub(r"^(re:|fwd:|fw:)\s*", "", subject, flags=re.IGNORECASE).strip()
# Remove extra whitespace and normalize
normalized = " ".join(normalized.split())
return normalized
def generate_thread_id(sender_email: str, recipient_email: str, subject: str) -> str:
"""
Generate a consistent thread_id based on sender email, recipient email, and normalized subject.
This ensures that emails in the same conversation thread have the same thread_id.
"""
# Normalize emails to lowercase
sender_clean = sender_email.lower().strip()
recipient_clean = recipient_email.lower().strip()
# Normalize subject by removing reply prefixes
subject_normalized = normalize_subject_for_thread(subject)
# Create a consistent thread_id by concatenating the components
# We sort sender and recipient to ensure consistency regardless of direction
emails_sorted = sorted([sender_clean, recipient_clean])
thread_components = emails_sorted + [subject_normalized]
# Join components and create a hash for consistent length
thread_string = "|".join(thread_components)
# Create a hash to ensure consistent length and avoid special characters
import hashlib
thread_hash = hashlib.sha256(thread_string.encode("utf-8")).hexdigest()[:16]
# Return a readable format: hash-first_few_chars_of_subject
subject_prefix = "".join(c for c in subject_normalized[:10] if c.isalnum()).lower()
if subject_prefix:
return f"{thread_hash}-{subject_prefix}"
else:
return thread_hash
def parse_ses_event(event: Dict[str, Any]) -> EmailEvent:
"""Parse SES email event to extract relevant information."""
try:
# Extract SES mail object
ses_record = event["Records"][0]["ses"]
mail = ses_record["mail"]
receipt = ses_record["receipt"]
# Extract basic info
message_id = mail["messageId"]
timestamp = datetime.fromisoformat(mail["timestamp"].replace("Z", "+00:00"))
# Extract sender and recipient
sender_email = extract_email_address(mail["commonHeaders"]["from"][0])
recipient_email = mail["commonHeaders"]["to"][0]
subject = mail["commonHeaders"]["subject"]
# Generate consistent thread_id based on sender, recipient, and normalized subject
thread_id = generate_thread_id(sender_email, recipient_email, subject)
print(
f"Generated thread_id: {thread_id} for email from {sender_email} to {recipient_email} with subject: {subject}"
)
# S3 location
s3_bucket = S3_BUCKET
s3_key = "emails/" + message_id
return EmailEvent(
message_id=message_id,
thread_id=thread_id,
sender_email=sender_email,
recipient_email=recipient_email,
subject=subject,
content="", # Will be filled from S3
timestamp=timestamp,
s3_bucket=s3_bucket,
s3_key=s3_key,
)
except Exception as e:
print(f"Error parsing SES event: {e}")
raise
def fetch_email_from_s3(bucket: str, key: str) -> str:
"""Fetch email content from S3."""
try:
response = s3_client.get_object(Bucket=bucket, Key=key)
email_content = response["Body"].read().decode("utf-8")
# Parse email to extract text content
msg = email.message_from_string(email_content)
# Extract plain text content
text_content = ""
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
payload = part.get_payload(decode=True)
if isinstance(payload, bytes):
text_content += payload.decode("utf-8")
elif isinstance(payload, str):
text_content += payload
else:
if msg.get_content_type() == "text/plain":
payload = msg.get_payload(decode=True)
if isinstance(payload, bytes):
text_content = payload.decode("utf-8")
elif isinstance(payload, str):
text_content = payload
# If no plain text, try to get any text content
if not text_content.strip():
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type().startswith("text/"):
payload = part.get_payload(decode=True)
if isinstance(payload, bytes):
text_content += payload.decode("utf-8", errors="ignore")
elif isinstance(payload, str):
text_content += payload
else:
payload = msg.get_payload(decode=True)
if isinstance(payload, bytes):
text_content = payload.decode("utf-8", errors="ignore")
elif isinstance(payload, str):
text_content = payload
return text_content.strip()
except Exception as e:
print(f"Error fetching email from S3: {e}")
return ""
def send_ses_reply(
to_email: str,
from_email: str,
subject: str,
body: str,
user: User,
in_reply_to_id: Optional[str] = None,
) -> bool:
"""Send reply email using SES."""
try:
# Add signature to body if not already present
signature = f"\n\nBest regards,\n{user.profile.name}"
body_with_signature = body + signature
# Prepare email
message = {
"Subject": {"Data": subject, "Charset": "UTF-8"},
"Body": {"Text": {"Data": body_with_signature, "Charset": "UTF-8"}},
}
# Send email
response = ses_client.send_email(
Source=from_email, Destination={"ToAddresses": [to_email]}, Message=message
)
print(f"Email sent successfully. Message ID: {response['MessageId']}")
return True
except Exception as e:
print(f"Error sending SES reply: {e}")
return False
# AI Agents (same as original)
deal_filter_agent = Agent(
name="Deal Filter Agent",
instructions="""
You are an Email Deal Filter for a content creator's inbox.
Your job is to classify each email into one of three categories:
1. Sponsorship / Revenue Opportunity
- Paid Sponsorship (flat rate)
- Affiliate Offer
- Performance / Hybrid (MG + performance, CPA, CPC, CPM)
- Product Seeding / Gifting
- UGC / Freelance Content Request (content for brand-owned channels)
2. Non-Sponsorship / Business
- Events/Press invites, equity partnerships, collaboration with another creator, casting/appearance request, speaking opportunities
3. Other (ignore)
- Fan mail, donations, platform notices, agencies recruiting, feedback, spam, newsletters, cold irrelevant outreach, system alerts, job offers, recruiting.
Rules:
- If any budget/payment language is present, prefer category (1).
- If gifting/product only, choose sponsorship → Gifting.
- If multiple apply, pick the highest ID category in Sponsorship (flat > affiliate > hybrid > gifting > ugc).
- If no clear business opportunity, classify as Other.