prospect-research-integration

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Prospect Research Integration

潜在客户调研集成

You are an expert in building sales bots that automatically research and enrich prospect data from multiple sources. Your goal is to help developers create systems that gather intelligence to personalize outreach and qualify leads.
您是构建销售机器人的专家,这类机器人可自动从多个来源调研并丰富潜在客户数据。您的目标是帮助开发者创建能够收集情报以实现个性化触达和销售线索筛选的系统。

Why Research Integration Matters

调研集成的重要性

The Data Gap

数据缺口

What you have:
- Name: John Smith
- Email: john@company.com
- Company: Acme Corp

What you need to personalize:
- Role and responsibilities
- Company initiatives
- Recent news/triggers
- Tech stack
- Pain points
- Budget authority
What you have:
- Name: John Smith
- Email: john@company.com
- Company: Acme Corp

What you need to personalize:
- Role and responsibilities
- Company initiatives
- Recent news/triggers
- Tech stack
- Pain points
- Budget authority

With Research Integration

集成调研后的效果

Auto-enriched profile:
- Title: VP of Engineering
- Reports to: CTO
- Team size: 45 engineers
- Recent: Announced series B
- Tech: AWS, React, Python
- Hiring: 12 open roles
- Trigger: New product launch

Now you can personalize.
Auto-enriched profile:
- Title: VP of Engineering
- Reports to: CTO
- Team size: 45 engineers
- Recent: Announced series B
- Tech: AWS, React, Python
- Hiring: 12 open roles
- Trigger: New product launch

Now you can personalize.

Data Source Integration

数据源集成

Source Configuration

数据源配置

python
class DataSourceManager:
    def __init__(self):
        self.sources = {}
        self.priority_order = []
        self.cache = ResearchCache()

    def register_source(self, source_id, source_config):
        self.sources[source_id] = {
            "connector": source_config["connector"],
            "data_types": source_config["data_types"],
            "rate_limit": source_config.get("rate_limit"),
            "cost_per_lookup": source_config.get("cost", 0),
            "reliability": source_config.get("reliability", 0.8)
        }

    def lookup(self, identifier, data_types_needed):
        """Lookup data from best available sources"""

        results = {}
        sources_used = []

        for data_type in data_types_needed:
            # Find best source for this data type
            source = self.find_best_source(data_type)
            if source:
                # Check cache first
                cached = self.cache.get(identifier, data_type, source)
                if cached and not cached.is_stale():
                    results[data_type] = cached.data
                else:
                    # Fetch fresh
                    data = self.fetch_from_source(source, identifier, data_type)
                    if data:
                        results[data_type] = data
                        self.cache.set(identifier, data_type, source, data)
                        sources_used.append(source)

        return {
            "data": results,
            "sources_used": sources_used,
            "completeness": len(results) / len(data_types_needed)
        }
python
class DataSourceManager:
    def __init__(self):
        self.sources = {}
        self.priority_order = []
        self.cache = ResearchCache()

    def register_source(self, source_id, source_config):
        self.sources[source_id] = {
            "connector": source_config["connector"],
            "data_types": source_config["data_types"],
            "rate_limit": source_config.get("rate_limit"),
            "cost_per_lookup": source_config.get("cost", 0),
            "reliability": source_config.get("reliability", 0.8)
        }

    def lookup(self, identifier, data_types_needed):
        """Lookup data from best available sources"""

        results = {}
        sources_used = []

        for data_type in data_types_needed:
            # Find best source for this data type
            source = self.find_best_source(data_type)
            if source:
                # Check cache first
                cached = self.cache.get(identifier, data_type, source)
                if cached and not cached.is_stale():
                    results[data_type] = cached.data
                else:
                    # Fetch fresh
                    data = self.fetch_from_source(source, identifier, data_type)
                    if data:
                        results[data_type] = data
                        self.cache.set(identifier, data_type, source, data)
                        sources_used.append(source)

        return {
            "data": results,
            "sources_used": sources_used,
            "completeness": len(results) / len(data_types_needed)
        }

Source definitions

Source definitions

source_manager = DataSourceManager()
source_manager.register_source("clearbit", { "connector": ClearbitConnector(), "data_types": ["company_info", "contact_info", "tech_stack"], "rate_limit": 100, "cost_per_lookup": 0.10 })
source_manager.register_source("linkedin_api", { "connector": LinkedInConnector(), "data_types": ["contact_info", "work_history", "connections"], "rate_limit": 50, "cost_per_lookup": 0.05 })
source_manager.register_source("news_api", { "connector": NewsAPIConnector(), "data_types": ["company_news", "press_releases"], "rate_limit": 1000, "cost_per_lookup": 0.01 })
undefined
source_manager = DataSourceManager()
source_manager.register_source("clearbit", { "connector": ClearbitConnector(), "data_types": ["company_info", "contact_info", "tech_stack"], "rate_limit": 100, "cost_per_lookup": 0.10 })
source_manager.register_source("linkedin_api", { "connector": LinkedInConnector(), "data_types": ["contact_info", "work_history", "connections"], "rate_limit": 50, "cost_per_lookup": 0.05 })
source_manager.register_source("news_api", { "connector": NewsAPIConnector(), "data_types": ["company_news", "press_releases"], "rate_limit": 1000, "cost_per_lookup": 0.01 })
undefined

Company Research

企业调研

python
def research_company(company_name, domain=None):
    """Gather comprehensive company intelligence"""

    research = {
        "basic_info": {},
        "financials": {},
        "tech_stack": {},
        "news": [],
        "hiring": {},
        "social": {}
    }

    # Basic company info
    basic = source_manager.lookup(
        domain or company_name,
        ["company_size", "industry", "location", "founded"]
    )
    research["basic_info"] = basic["data"]

    # Financial signals
    financials = gather_financial_signals(company_name)
    research["financials"] = {
        "funding": financials.get("recent_funding"),
        "revenue_estimate": financials.get("revenue"),
        "growth_signals": financials.get("growth_indicators")
    }

    # Technology stack
    tech = detect_tech_stack(domain)
    research["tech_stack"] = {
        "detected": tech["technologies"],
        "categories": categorize_tech(tech["technologies"]),
        "relevant_to_us": filter_relevant_tech(tech["technologies"])
    }

    # Recent news
    news = source_manager.lookup(company_name, ["company_news"])
    research["news"] = extract_relevant_news(news["data"], days=90)

    # Hiring signals
    hiring = scrape_job_postings(company_name, domain)
    research["hiring"] = {
        "total_openings": len(hiring),
        "by_department": group_by_department(hiring),
        "growth_areas": identify_growth_areas(hiring),
        "relevant_roles": filter_relevant_roles(hiring)
    }

    return research

def gather_financial_signals(company_name):
    """Gather financial intelligence"""

    signals = {}

    # Check for recent funding
    funding = lookup_crunchbase(company_name)
    if funding:
        signals["recent_funding"] = {
            "amount": funding.amount,
            "round": funding.round_type,
            "date": funding.date,
            "investors": funding.investors
        }

    # Check for IPO/acquisition news
    corporate_events = search_sec_filings(company_name)
    signals["corporate_events"] = corporate_events

    return signals
python
def research_company(company_name, domain=None):
    """Gather comprehensive company intelligence"""

    research = {
        "basic_info": {},
        "financials": {},
        "tech_stack": {},
        "news": [],
        "hiring": {},
        "social": {}
    }

    # Basic company info
    basic = source_manager.lookup(
        domain or company_name,
        ["company_size", "industry", "location", "founded"]
    )
    research["basic_info"] = basic["data"]

    # Financial signals
    financials = gather_financial_signals(company_name)
    research["financials"] = {
        "funding": financials.get("recent_funding"),
        "revenue_estimate": financials.get("revenue"),
        "growth_signals": financials.get("growth_indicators")
    }

    # Technology stack
    tech = detect_tech_stack(domain)
    research["tech_stack"] = {
        "detected": tech["technologies"],
        "categories": categorize_tech(tech["technologies"]),
        "relevant_to_us": filter_relevant_tech(tech["technologies"])
    }

    # Recent news
    news = source_manager.lookup(company_name, ["company_news"])
    research["news"] = extract_relevant_news(news["data"], days=90)

    # Hiring signals
    hiring = scrape_job_postings(company_name, domain)
    research["hiring"] = {
        "total_openings": len(hiring),
        "by_department": group_by_department(hiring),
        "growth_areas": identify_growth_areas(hiring),
        "relevant_roles": filter_relevant_roles(hiring)
    }

    return research

def gather_financial_signals(company_name):
    """Gather financial intelligence"""

    signals = {}

    # Check for recent funding
    funding = lookup_crunchbase(company_name)
    if funding:
        signals["recent_funding"] = {
            "amount": funding.amount,
            "round": funding.round_type,
            "date": funding.date,
            "investors": funding.investors
        }

    # Check for IPO/acquisition news
    corporate_events = search_sec_filings(company_name)
    signals["corporate_events"] = corporate_events

    return signals

Contact Research

联系人调研

python
def research_contact(email=None, name=None, company=None, linkedin_url=None):
    """Gather comprehensive contact intelligence"""

    research = {
        "professional": {},
        "social": {},
        "activity": {},
        "connections": {}
    }

    # Professional info
    if linkedin_url:
        linkedin_data = scrape_linkedin_profile(linkedin_url)
        research["professional"] = {
            "current_role": linkedin_data.get("title"),
            "tenure": calculate_tenure(linkedin_data.get("start_date")),
            "previous_roles": linkedin_data.get("experience", [])[:3],
            "education": linkedin_data.get("education"),
            "skills": linkedin_data.get("skills", [])[:10]
        }

    # Enrich from data providers
    enriched = source_manager.lookup(
        email or linkedin_url,
        ["contact_info", "work_history"]
    )
    research["professional"].update(enriched.get("data", {}))

    # Social activity
    if linkedin_url:
        activity = get_linkedin_activity(linkedin_url)
        research["activity"] = {
            "recent_posts": activity.get("posts", [])[:5],
            "engagement_topics": extract_topics(activity),
            "content_style": analyze_content_style(activity)
        }

    # Mutual connections
    if company:
        research["connections"] = {
            "mutual_connections": find_mutual_connections(linkedin_url),
            "shared_groups": find_shared_groups(linkedin_url),
            "common_background": find_common_background(linkedin_url)
        }

    return research
python
def research_contact(email=None, name=None, company=None, linkedin_url=None):
    """Gather comprehensive contact intelligence"""

    research = {
        "professional": {},
        "social": {},
        "activity": {},
        "connections": {}
    }

    # Professional info
    if linkedin_url:
        linkedin_data = scrape_linkedin_profile(linkedin_url)
        research["professional"] = {
            "current_role": linkedin_data.get("title"),
            "tenure": calculate_tenure(linkedin_data.get("start_date")),
            "previous_roles": linkedin_data.get("experience", [])[:3],
            "education": linkedin_data.get("education"),
            "skills": linkedin_data.get("skills", [])[:10]
        }

    # Enrich from data providers
    enriched = source_manager.lookup(
        email or linkedin_url,
        ["contact_info", "work_history"]
    )
    research["professional"].update(enriched.get("data", {}))

    # Social activity
    if linkedin_url:
        activity = get_linkedin_activity(linkedin_url)
        research["activity"] = {
            "recent_posts": activity.get("posts", [])[:5],
            "engagement_topics": extract_topics(activity),
            "content_style": analyze_content_style(activity)
        }

    # Mutual connections
    if company:
        research["connections"] = {
            "mutual_connections": find_mutual_connections(linkedin_url),
            "shared_groups": find_shared_groups(linkedin_url),
            "common_background": find_common_background(linkedin_url)
        }

    return research

Trigger Event Detection

触发事件检测

Event Monitoring

事件监控

python
class TriggerEventMonitor:
    def __init__(self):
        self.event_types = [
            "funding_round",
            "executive_hire",
            "product_launch",
            "expansion",
            "acquisition",
            "partnership",
            "award",
            "earnings"
        ]

    def monitor_account(self, company, domain):
        """Set up monitoring for trigger events"""

        monitors = []

        # News monitoring
        monitors.append({
            "type": "news",
            "query": f'"{company}" OR site:{domain}',
            "frequency": "daily"
        })

        # Job posting monitoring
        monitors.append({
            "type": "jobs",
            "company": company,
            "frequency": "weekly"
        })

        # LinkedIn monitoring
        monitors.append({
            "type": "linkedin_company",
            "company": company,
            "track": ["posts", "employee_changes"]
        })

        return create_monitors(monitors)

    def process_event(self, event, account):
        """Process detected trigger event"""

        # Classify event
        event_type = classify_event(event)

        # Score relevance
        relevance = score_event_relevance(event, account)

        if relevance > 0.6:
            trigger = {
                "account_id": account.id,
                "event_type": event_type,
                "event_data": event,
                "relevance_score": relevance,
                "detected_at": datetime.now(),
                "recommended_action": get_recommended_action(event_type)
            }

            # Alert for high-priority triggers
            if relevance > 0.8:
                send_trigger_alert(trigger)

            return trigger

        return None


def classify_event(event):
    """Classify event into trigger category"""

    patterns = {
        "funding_round": [
            r"raised.*\$\d+",
            r"series [a-e]",
            r"funding round",
            r"investment from"
        ],
        "executive_hire": [
            r"(hired|appointed|named).*(?:CEO|CTO|VP|Director)",
            r"joins as",
            r"new (?:CEO|CTO|VP)"
        ],
        "product_launch": [
            r"launched",
            r"announces.*product",
            r"introduces",
            r"releases"
        ],
        "expansion": [
            r"expands to",
            r"opens.*office",
            r"enters.*market",
            r"international expansion"
        ]
    }

    for event_type, event_patterns in patterns.items():
        for pattern in event_patterns:
            if re.search(pattern, event.text, re.IGNORECASE):
                return event_type

    return "other"
python
class TriggerEventMonitor:
    def __init__(self):
        self.event_types = [
            "funding_round",
            "executive_hire",
            "product_launch",
            "expansion",
            "acquisition",
            "partnership",
            "award",
            "earnings"
        ]

    def monitor_account(self, company, domain):
        """Set up monitoring for trigger events"""

        monitors = []

        # News monitoring
        monitors.append({
            "type": "news",
            "query": f'"{company}" OR site:{domain}',
            "frequency": "daily"
        })

        # Job posting monitoring
        monitors.append({
            "type": "jobs",
            "company": company,
            "frequency": "weekly"
        })

        # LinkedIn monitoring
        monitors.append({
            "type": "linkedin_company",
            "company": company,
            "track": ["posts", "employee_changes"]
        })

        return create_monitors(monitors)

    def process_event(self, event, account):
        """Process detected trigger event"""

        # Classify event
        event_type = classify_event(event)

        # Score relevance
        relevance = score_event_relevance(event, account)

        if relevance > 0.6:
            trigger = {
                "account_id": account.id,
                "event_type": event_type,
                "event_data": event,
                "relevance_score": relevance,
                "detected_at": datetime.now(),
                "recommended_action": get_recommended_action(event_type)
            }

            # Alert for high-priority triggers
            if relevance > 0.8:
                send_trigger_alert(trigger)

            return trigger

        return None


def classify_event(event):
    """Classify event into trigger category"""

    patterns = {
        "funding_round": [
            r"raised.*\$\d+",
            r"series [a-e]",
            r"funding round",
            r"investment from"
        ],
        "executive_hire": [
            r"(hired|appointed|named).*(?:CEO|CTO|VP|Director)",
            r"joins as",
            r"new (?:CEO|CTO|VP)"
        ],
        "product_launch": [
            r"launched",
            r"announces.*product",
            r"introduces",
            r"releases"
        ],
        "expansion": [
            r"expands to",
            r"opens.*office",
            r"enters.*market",
            r"international expansion"
        ]
    }

    for event_type, event_patterns in patterns.items():
        for pattern in event_patterns:
            if re.search(pattern, event.text, re.IGNORECASE):
                return event_type

    return "other"

Data Synthesis

数据合成

Profile Builder

潜在客户档案构建器

python
class ProspectProfileBuilder:
    def __init__(self):
        self.research_sources = []

    def build_profile(self, prospect_input):
        """Build comprehensive prospect profile"""

        profile = ProspectProfile()

        # Research company
        company_research = research_company(
            prospect_input.company,
            prospect_input.domain
        )
        profile.company = company_research

        # Research contact
        contact_research = research_contact(
            email=prospect_input.email,
            name=prospect_input.name,
            company=prospect_input.company,
            linkedin_url=prospect_input.linkedin
        )
        profile.contact = contact_research

        # Find triggers
        triggers = find_recent_triggers(prospect_input.company)
        profile.triggers = triggers

        # Synthesize insights
        profile.insights = self.synthesize_insights(profile)

        # Score profile completeness
        profile.completeness_score = self.calculate_completeness(profile)

        return profile

    def synthesize_insights(self, profile):
        """Generate actionable insights from research"""

        insights = []

        # Company + contact alignment
        if profile.contact.get("professional", {}).get("tenure"):
            tenure = profile.contact["professional"]["tenure"]
            if tenure < 6:
                insights.append({
                    "type": "new_in_role",
                    "insight": f"Recently started ({tenure} months)",
                    "opportunity": "May be evaluating new tools"
                })

        # Growth signals
        if profile.company.get("hiring", {}).get("total_openings", 0) > 10:
            insights.append({
                "type": "high_growth",
                "insight": f"{profile.company['hiring']['total_openings']} open roles",
                "opportunity": "Scaling team, may need solutions"
            })

        # Funding trigger
        funding = profile.company.get("financials", {}).get("funding")
        if funding and days_since(funding["date"]) < 90:
            insights.append({
                "type": "recent_funding",
                "insight": f"Raised {funding['amount']} {funding['round']}",
                "opportunity": "Budget available for new initiatives"
            })

        # Tech stack fit
        tech = profile.company.get("tech_stack", {}).get("relevant_to_us", [])
        if tech:
            insights.append({
                "type": "tech_fit",
                "insight": f"Using {', '.join(tech[:3])}",
                "opportunity": "Good technical fit"
            })

        return insights
python
class ProspectProfileBuilder:
    def __init__(self):
        self.research_sources = []

    def build_profile(self, prospect_input):
        """Build comprehensive prospect profile"""

        profile = ProspectProfile()

        # Research company
        company_research = research_company(
            prospect_input.company,
            prospect_input.domain
        )
        profile.company = company_research

        # Research contact
        contact_research = research_contact(
            email=prospect_input.email,
            name=prospect_input.name,
            company=prospect_input.company,
            linkedin_url=prospect_input.linkedin
        )
        profile.contact = contact_research

        # Find triggers
        triggers = find_recent_triggers(prospect_input.company)
        profile.triggers = triggers

        # Synthesize insights
        profile.insights = self.synthesize_insights(profile)

        # Score profile completeness
        profile.completeness_score = self.calculate_completeness(profile)

        return profile

    def synthesize_insights(self, profile):
        """Generate actionable insights from research"""

        insights = []

        # Company + contact alignment
        if profile.contact.get("professional", {}).get("tenure"):
            tenure = profile.contact["professional"]["tenure"]
            if tenure < 6:
                insights.append({
                    "type": "new_in_role",
                    "insight": f"Recently started ({tenure} months)",
                    "opportunity": "May be evaluating new tools"
                })

        # Growth signals
        if profile.company.get("hiring", {}).get("total_openings", 0) > 10:
            insights.append({
                "type": "high_growth",
                "insight": f"{profile.company['hiring']['total_openings']} open roles",
                "opportunity": "Scaling team, may need solutions"
            })

        # Funding trigger
        funding = profile.company.get("financials", {}).get("funding")
        if funding and days_since(funding["date"]) < 90:
            insights.append({
                "type": "recent_funding",
                "insight": f"Raised {funding['amount']} {funding['round']}",
                "opportunity": "Budget available for new initiatives"
            })

        # Tech stack fit
        tech = profile.company.get("tech_stack", {}).get("relevant_to_us", [])
        if tech:
            insights.append({
                "type": "tech_fit",
                "insight": f"Using {', '.join(tech[:3])}",
                "opportunity": "Good technical fit"
            })

        return insights

Research Scoring

调研质量评分

python
def score_research_quality(profile):
    """Score the quality and completeness of research"""

    scores = {}

    # Contact completeness
    contact_fields = ["title", "tenure", "previous_roles", "linkedin_url"]
    contact_complete = sum(
        1 for f in contact_fields
        if profile.contact.get("professional", {}).get(f)
    )
    scores["contact"] = contact_complete / len(contact_fields)

    # Company completeness
    company_fields = ["size", "industry", "tech_stack", "funding"]
    company_complete = sum(
        1 for f in company_fields
        if profile.company.get(f) or profile.company.get("basic_info", {}).get(f)
    )
    scores["company"] = company_complete / len(company_fields)

    # Trigger freshness
    if profile.triggers:
        most_recent = max(t["detected_at"] for t in profile.triggers)
        days_old = (datetime.now() - most_recent).days
        scores["triggers"] = max(0, 1 - days_old / 90)
    else:
        scores["triggers"] = 0

    # Overall score
    scores["overall"] = (
        scores["contact"] * 0.3 +
        scores["company"] * 0.4 +
        scores["triggers"] * 0.3
    )

    return scores
python
def score_research_quality(profile):
    """Score the quality and completeness of research"""

    scores = {}

    # Contact completeness
    contact_fields = ["title", "tenure", "previous_roles", "linkedin_url"]
    contact_complete = sum(
        1 for f in contact_fields
        if profile.contact.get("professional", {}).get(f)
    )
    scores["contact"] = contact_complete / len(contact_fields)

    # Company completeness
    company_fields = ["size", "industry", "tech_stack", "funding"]
    company_complete = sum(
        1 for f in company_fields
        if profile.company.get(f) or profile.company.get("basic_info", {}).get(f)
    )
    scores["company"] = company_complete / len(company_fields)

    # Trigger freshness
    if profile.triggers:
        most_recent = max(t["detected_at"] for t in profile.triggers)
        days_old = (datetime.now() - most_recent).days
        scores["triggers"] = max(0, 1 - days_old / 90)
    else:
        scores["triggers"] = 0

    # Overall score
    scores["overall"] = (
        scores["contact"] * 0.3 +
        scores["company"] * 0.4 +
        scores["triggers"] * 0.3
    )

    return scores

Caching & Freshness

缓存与数据新鲜度

Research Cache

调研缓存

python
class ResearchCache:
    def __init__(self):
        self.cache = {}
        self.freshness_rules = {
            "company_info": timedelta(days=30),
            "contact_info": timedelta(days=14),
            "tech_stack": timedelta(days=60),
            "news": timedelta(days=1),
            "hiring": timedelta(days=7)
        }

    def get(self, identifier, data_type, source):
        key = f"{identifier}:{data_type}:{source}"
        cached = self.cache.get(key)

        if cached:
            freshness_window = self.freshness_rules.get(
                data_type,
                timedelta(days=7)
            )
            is_stale = datetime.now() - cached["timestamp"] > freshness_window

            return CacheResult(
                data=cached["data"],
                is_stale=is_stale,
                age=datetime.now() - cached["timestamp"]
            )

        return None

    def set(self, identifier, data_type, source, data):
        key = f"{identifier}:{data_type}:{source}"
        self.cache[key] = {
            "data": data,
            "timestamp": datetime.now(),
            "source": source
        }
python
class ResearchCache:
    def __init__(self):
        self.cache = {}
        self.freshness_rules = {
            "company_info": timedelta(days=30),
            "contact_info": timedelta(days=14),
            "tech_stack": timedelta(days=60),
            "news": timedelta(days=1),
            "hiring": timedelta(days=7)
        }

    def get(self, identifier, data_type, source):
        key = f"{identifier}:{data_type}:{source}"
        cached = self.cache.get(key)

        if cached:
            freshness_window = self.freshness_rules.get(
                data_type,
                timedelta(days=7)
            )
            is_stale = datetime.now() - cached["timestamp"] > freshness_window

            return CacheResult(
                data=cached["data"],
                is_stale=is_stale,
                age=datetime.now() - cached["timestamp"]
            )

        return None

    def set(self, identifier, data_type, source, data):
        key = f"{identifier}:{data_type}:{source}"
        self.cache[key] = {
            "data": data,
            "timestamp": datetime.now(),
            "source": source
        }

Metrics

指标

Research Effectiveness

调研有效性

Track:
- Profile completeness rate
- Data freshness scores
- Source reliability rates
- Cost per enriched lead
- Research to conversion correlation

Optimize for:
- >80% profile completeness
- <7 day average data age
- Cost efficiency across sources
Track:
- Profile completeness rate
- Data freshness scores
- Source reliability rates
- Cost per enriched lead
- Research to conversion correlation

Optimize for:
- >80% profile completeness
- <7 day average data age
- Cost efficiency across sources