From futuresearch
Dispatches AI researchers to classify, rank, score, deduplicate, merge, forecast, and enrich Python dataframes at scale.
npx claudepluginhub futuresearch/futuresearch-python --plugin futuresearchThis skill uses the workspace's default tool permissions.
FutureSearch gives Claude a research team for your data. Use this skill when writing Python code that needs to:
Runs Extruct API tasks via bundled CLI: Deep Search, semantic/lookalike search, company/people tables, column operations, enrichment, contact finding.
Delivers expertise in advanced analytics, machine learning, statistical modeling, EDA, predictive modeling, time series, and business intelligence for data science workflows.
Guides Salesforce Data Cloud (2025) integration patterns and architecture: data ingestion from 200+ sources, harmonization, identity resolution, real-time activation, zero-copy querying.
Share bugs, ideas, or general feedback.
FutureSearch gives Claude a research team for your data. Use this skill when writing Python code that needs to:
Documentation: For detailed guides, case studies, and API reference, see:
- Docs site: futuresearch.ai/docs
- GitHub: github.com/futuresearch/everyrow-sdk
Operations:
pip install futuresearch
If an MCP server is available (futuresearch_classify, futuresearch_rank, etc. tools), you can use it directly without writing Python code. The MCP server operates on uploaded data (via artifact IDs or inline JSON).
To install the MCP server, add to your MCP config:
{
"mcpServers": {
"futuresearch": {
"type": "http",
"url": "https://mcp.futuresearch.ai/mcp"
}
}
}
Config file locations:
~/.claude.json (user) or .mcp.json (project)~/Library/Application Support/Claude/claude_desktop_config.json (macOS)~/.cursor/mcp.jsonUse MCP tools when:
Use Python SDK when:
If you have the FutureSearch MCP server configured, these 18 tools are available. All data processing tools accept input via artifact_id (from upload_data or request_upload_url) or data (inline JSON rows). Provide exactly one.
Run web research agents on each row.
Parameters:
- task: (required) Natural language description of research task
- artifact_id: Artifact ID (UUID) from upload_data or request_upload_url
- data: Inline data as a list of row objects
- response_schema: (optional) JSON schema for per-row agent response
- session_id: (optional) Session UUID to resume
- session_name: (optional) Name for a new session
Run a single research agent on one input (no CSV needed).
Parameters:
- task: (required) Natural language task for the agent
- input_data: (optional) Context as key-value pairs (e.g. {"company": "Acme"})
- response_schema: (optional) JSON schema for the agent response
- session_id: (optional) Session UUID to resume
- session_name: (optional) Name for a new session
Score and sort rows based on qualitative criteria.
Parameters:
- task: (required) Natural language instructions for scoring a single row
- field_name: (required) Name of the score field to add
- artifact_id: Artifact ID (UUID) from upload_data or request_upload_url
- data: Inline data as a list of row objects
- field_type: (optional) "float" (default), "int", "str", or "bool"
- ascending_order: (optional) Sort ascending (default: true)
- response_schema: (optional) JSON schema for the response model
- session_id / session_name: (optional)
Remove duplicate rows using semantic equivalence.
Parameters:
- equivalence_relation: (required) Natural language description of what makes rows duplicates
- artifact_id: Artifact ID (UUID) from upload_data or request_upload_url
- data: Inline data as a list of row objects
- session_id / session_name: (optional)
Join two tables using intelligent entity matching (LEFT JOIN semantics).
Parameters:
- task: (required) Natural language description of how to match rows
- left_artifact_id / left_data: (required, exactly one) Left table — the table being enriched (all rows kept)
- right_artifact_id / right_data: (required, exactly one) Right table — lookup/reference (columns appended to matches)
- merge_on_left: (optional) Only set if you expect exact string matches or want to draw agent attention to a column
- merge_on_right: (optional) Same as merge_on_left for right table
- relationship_type: (optional) "many_to_one" (default), "one_to_one", "one_to_many", "many_to_many"
- use_web_search: (optional) "auto" (default), "yes", or "no"
- session_id / session_name: (optional)
Forecast the probability of binary questions.
Parameters:
- artifact_id: Artifact ID (UUID) from upload_data or request_upload_url
- data: Inline data as a list of row objects (must include "question" column)
- context: (optional) Batch-level context for all questions
- session_id / session_name: (optional)
Classify each row into one of the provided categories.
Parameters:
- task: (required) Natural language classification instructions
- categories: (required) Allowed categories (minimum 2)
- artifact_id: Artifact ID (UUID) from upload_data or request_upload_url
- data: Inline data as a list of row objects
- classification_field: (optional) Output column name (default: "classification")
- include_reasoning: (optional) Include reasoning column (default: false)
- session_id / session_name: (optional)
Browse available reference lists of well-known entities (S&P 500, FTSE 100, countries, universities, etc.).
Parameters:
- search: (optional) Search term to match list names
- category: (optional) Filter by category (e.g. "Finance", "Geography")
Import a reference list into your session and save it as a CSV.
Parameters:
- artifact_id: (required) artifact_id from futuresearch_browse_lists results
Upload data from a URL or local file. Returns an artifact_id for use in processing tools.
Parameters:
- source: (required) HTTP(S) URL (Google Sheets supported) or local CSV path (stdio mode only)
- session_id / session_name: (optional)
Request a presigned URL to upload a local CSV file (HTTP mode only).
Parameters:
- filename: (required) Name of the file to upload (must end in .csv)
Steps: call this tool → execute the returned curl command → use the artifact_id from the response.
Check progress of a running task. Blocks briefly to limit polling rate.
Parameters:
- task_id: (required) Task ID returned by the operation tool
After receiving a status update, immediately call futuresearch_progress again unless the task is completed or failed.
Retrieve results from a completed task.
Parameters:
- task_id: (required) Task ID of the completed task
- output_path: (stdio) Full path to output CSV (must end in .csv)
- offset: (http, optional) Row offset for pagination (default: 0)
- page_size: (http, optional) Number of rows to load into context (default: auto threshold based on row count)
Only call after futuresearch_progress reports status "completed".
Cancel a running task.
Parameters:
- task_id: (required) Task ID to cancel
List sessions owned by the authenticated user (paginated).
Parameters:
- offset: (optional) Number of sessions to skip (default: 0)
- limit: (optional) Max sessions per page (default: 25, max: 1000)
List all tasks in a session with their IDs, statuses, and types.
Parameters:
- session_id: (required) Session ID (UUID) to list tasks for
Check the current billing balance for the authenticated user.
No parameters.
All operations return a result object. The data is available as a pandas DataFrame in result.data:
result = await rank(...)
print(result.data.head()) # pandas DataFrame
For quick one-off operations, sessions are created automatically.
Score rows based on criteria you can't put in a database field:
from futuresearch.ops import rank
result = await rank(
task="Score by likelihood to need data integration solutions",
input=leads_dataframe,
field_name="integration_need_score",
ascending_order=False, # highest first
)
print(result.data.head())
Structured output - get more than just a score:
from pydantic import BaseModel, Field
class AcquisitionScore(BaseModel):
fit_score: float = Field(description="0-100, strategic alignment")
annual_revenue_usd: int = Field(description="Estimated annual revenue in USD")
result = await rank(
task="Score acquisition targets by product-market fit",
input=potential_acquisitions,
field_name="fit_score",
response_model=AcquisitionScore,
ascending_order=False,
)
Parameters: task, input, field_name, field_type (default: "float"), response_model, ascending_order (default: True), preview, session
Remove duplicates using AI-powered semantic matching. The AI understands that "AbbVie Inc", "Abbvie", and "AbbVie Pharmaceutical" are the same company:
from futuresearch.ops import dedupe
result = await dedupe(
input=crm_data,
equivalence_relation="Two entries are duplicates if they represent the same legal entity",
)
print(result.data.head())
Strategies - control what happens after clusters are identified:
"select" (default): Pick the best representative from each cluster"identify": Cluster only, no selection (for manual review)"combine": Synthesize a single combined row per clusterresult = await dedupe(
input=crm_data,
equivalence_relation="Same legal entity",
strategy="select",
strategy_prompt="Prefer the record with the most complete contact information",
)
deduped = result.data[result.data["selected"] == True]
Results include equivalence_class_id (groups duplicates), equivalence_class_name (human-readable cluster name), and selected (the canonical record when using select/combine strategy).
Parameters: input, equivalence_relation, strategy, strategy_prompt, session
Join two tables when the keys don't match exactly (LEFT JOIN semantics). The AI knows "Photoshop" belongs to "Adobe" and "Genentech" is a Roche subsidiary:
from futuresearch.ops import merge
result = await merge(
task="Match each software product to its parent company",
left_table=software_products, # table being enriched — all rows kept
right_table=approved_suppliers, # lookup/reference table — columns appended to matches
# merge_on_left/merge_on_right: omit unless you expect exact string matches
# on the chosen columns or want to draw agent attention to them.
)
print(result.data.head())
Parameters: task, left_table, right_table, merge_on_left, merge_on_right, relationship_type, use_web_search, session
Assign each row to one of the provided categories:
from futuresearch.ops import classify
result = await classify(
task="Classify this company by its GICS industry sector",
categories=["Energy", "Materials", "Industrials", "Consumer Discretionary",
"Consumer Staples", "Health Care", "Financials",
"Information Technology", "Communication Services",
"Utilities", "Real Estate"],
input=companies,
)
print(result.data[["company", "classification"]])
Binary classification - for yes/no questions, use two categories:
result = await classify(
task="Is this company founder-led?",
categories=["yes", "no"],
input=companies,
)
With reasoning - understand why each row was classified:
result = await classify(
task="Classify each company by its primary industry sector",
categories=["Technology", "Finance", "Healthcare", "Energy"],
input=companies,
classification_field="sector",
include_reasoning=True,
)
Parameters: task, categories, input, classification_field (default: "classification"), include_reasoning (default: False), session
Produce calibrated probability estimates for binary questions:
from futuresearch.ops import forecast
result = await forecast(
input=DataFrame([
{"question": "Will the US Federal Reserve cut rates by at least 25bp before July 1, 2027?",
"resolution_criteria": "Resolves YES if the Fed announces at least one rate cut of 25bp or more."},
]),
)
print(result.data[["question", "probability", "rationale"]])
Parameters: input, context, session
Run an AI agent on a single input:
from futuresearch.ops import single_agent
from pydantic import BaseModel
class CompanyInput(BaseModel):
company: str
result = await single_agent(
task="Find the company's most recent annual revenue and employee count",
input=CompanyInput(company="Stripe"),
)
print(result.data.head())
No input required - agents can work without input data:
result = await single_agent(
task="What company has reported the greatest cost reduction due to internal AI usage?",
)
Return a table - generate datasets from scratch:
from pydantic import BaseModel, Field
class CompanyInfo(BaseModel):
company: str = Field(description="Company name")
market_cap: int = Field(description="Market cap in USD")
result = await single_agent(
task="Find the three largest US healthcare companies by market cap",
response_model=CompanyInfo,
return_table=True,
)
Parameters: task, input, effort_level (LOW/MEDIUM/HIGH), response_model, return_table, session
Run an AI agent across multiple rows:
from futuresearch.ops import agent_map
from pandas import DataFrame
result = await agent_map(
task="Find this company's latest funding round and lead investors",
input=DataFrame([
{"company": "Anthropic"},
{"company": "OpenAI"},
{"company": "Mistral"},
]),
)
print(result.data.head())
Effort levels - control research thoroughness:
LOW (default): Quick lookups, basic web searchesMEDIUM: More thorough research, multiple sourcesHIGH: Deep research, cross-referencing sourcesfrom futuresearch.ops import agent_map
from futuresearch.types import EffortLevel
result = await agent_map(
task="Comprehensive competitive analysis",
input=competitors,
effort_level=EffortLevel.HIGH,
)
Parameters: task, input, effort_level, response_model, session
For multiple operations or when you need visibility into progress, use an explicit session:
from futuresearch import create_session
async with create_session(name="My Session") as session:
print(f"View session at: {session.get_url()}")
# All operations here share the same session
Sessions are visible on the futuresearch.ai dashboard.
All operations have _async variants for background processing. These need an explicit session since the task persists beyond the function call:
from futuresearch import create_session
from futuresearch.ops import rank_async
async with create_session(name="Async Ranking") as session:
task = await rank_async(
session=session,
task="Score this organization",
input=dataframe,
field_name="score",
)
print(f"Task ID: {task.task_id}") # Print this! Useful if your script crashes.
# Continue with other work...
result = await task.await_result()
Tip: Print the task ID after submitting. If your script crashes, you can fetch the result later using fetch_task_data:
from futuresearch import fetch_task_data
# Recover results from a crashed script
df = await fetch_task_data("12345678-1234-1234-1234-123456789abc")
FutureSearch operations (classify, rank, dedupe, merge, forecast, agent) take 1-10+ minutes. All MCP tools use an async pattern:
futuresearch_agent(...)) to get task_id and session_urlOperations can be chained to build complete workflows. Each step's output feeds the next:
from futuresearch import create_session
from futuresearch.ops import classify, dedupe, rank
async with create_session(name="Lead Pipeline") as session:
# 1. Filter to qualified leads
classified = await classify(
session=session,
task="Does this lead have a company email domain (not gmail, yahoo, etc.)?",
categories=["qualified", "unqualified"],
input=leads,
)
# 2. Dedupe across sources
deduped = await dedupe(
session=session,
input=classified.data[classified.data["classification"] == "qualified"],
equivalence_relation="Same company, accounting for Inc/LLC variations",
)
# 3. Prioritize for outreach
ranked = await rank(
session=session,
task="Score by likelihood to convert",
input=deduped.data[deduped.data["selected"] == True],
field_name="conversion_score",
)
FutureSearch operations have associated costs. To avoid re-running them unnecessarily:
preview=True: Operations like rank, classify, and merge support preview=True to process only a few rows first.