Extract and process data from GitHub repositories
Extracts READMEs, docs, code examples, issues, and releases from GitHub repositories for RAG ingestion. Use when you need to harvest repository content into a collection for context-aware coding or analysis.
/plugin marketplace add mindmorass/reflex/plugin install reflex@mindmorass-reflexThis skill inherits all available tools. When active, it can use any tool Claude has access to.
Extract and ingest content from GitHub repositories into RAG.
GitHub repositories contain valuable documentation, code examples, and discussions. This skill covers:
# GitHub CLI (recommended)
brew install gh # macOS
# or: https://cli.github.com/
# Python libraries
pip install PyGithub httpx
# Authenticate with GitHub CLI
gh auth login
# Or set token for API access
export GITHUB_TOKEN="ghp_..."
Best for quick extraction and authenticated access.
#!/bin/bash
# Extract repo content using gh CLI
REPO="$1" # owner/repo format
# Clone with depth 1 for content only
gh repo clone "$REPO" -- --depth 1
# Get repo info
gh repo view "$REPO" --json name,description,readme
# Get issues
gh issue list --repo "$REPO" --limit 100 --json title,body,comments
# Get discussions (if enabled)
gh api "repos/$REPO/discussions" --paginate
# Get releases
gh release list --repo "$REPO" --limit 20
Better for programmatic access and complex queries.
#!/usr/bin/env python3
"""GitHub content extraction using PyGithub."""
from github import Github
from typing import Dict, List, Optional
import base64
import os
class GitHubExtractor:
"""Extract content from GitHub repositories."""
def __init__(self, token: str = None):
self.token = token or os.getenv("GITHUB_TOKEN")
self.github = Github(self.token) if self.token else Github()
def get_repo(self, repo_name: str):
"""Get repository object."""
return self.github.get_repo(repo_name)
def get_readme(self, repo_name: str) -> Dict:
"""Extract README content."""
repo = self.get_repo(repo_name)
try:
readme = repo.get_readme()
content = base64.b64decode(readme.content).decode('utf-8')
return {
"content": content,
"path": readme.path,
"size": readme.size,
"url": readme.html_url
}
except Exception as e:
return {"error": str(e)}
def get_docs(self, repo_name: str) -> List[Dict]:
"""Extract documentation files."""
repo = self.get_repo(repo_name)
docs = []
# Common doc locations
doc_paths = ['docs', 'doc', 'documentation', '.github']
for path in doc_paths:
try:
contents = repo.get_contents(path)
docs.extend(self._extract_dir(repo, contents))
except Exception:
continue
# Also get root markdown files
try:
root_contents = repo.get_contents("")
for item in root_contents:
if item.type == "file" and item.name.endswith('.md'):
content = base64.b64decode(item.content).decode('utf-8')
docs.append({
"path": item.path,
"content": content,
"url": item.html_url
})
except Exception:
pass
return docs
def _extract_dir(self, repo, contents) -> List[Dict]:
"""Recursively extract directory contents."""
docs = []
if not isinstance(contents, list):
contents = [contents]
for item in contents:
if item.type == "dir":
sub_contents = repo.get_contents(item.path)
docs.extend(self._extract_dir(repo, sub_contents))
elif item.type == "file":
if item.name.endswith(('.md', '.rst', '.txt')):
try:
content = base64.b64decode(item.content).decode('utf-8')
docs.append({
"path": item.path,
"content": content,
"url": item.html_url
})
except Exception:
pass
return docs
def get_code_examples(
self,
repo_name: str,
patterns: List[str] = None
) -> List[Dict]:
"""Extract code examples from repository."""
repo = self.get_repo(repo_name)
examples = []
if patterns is None:
patterns = ['examples', 'samples', 'demo', 'tutorials']
for pattern in patterns:
try:
contents = repo.get_contents(pattern)
examples.extend(self._extract_code(repo, contents))
except Exception:
continue
return examples
def _extract_code(self, repo, contents) -> List[Dict]:
"""Extract code files."""
code = []
code_extensions = ['.py', '.js', '.ts', '.go', '.rs', '.java', '.rb']
if not isinstance(contents, list):
contents = [contents]
for item in contents:
if item.type == "dir":
sub = repo.get_contents(item.path)
code.extend(self._extract_code(repo, sub))
elif item.type == "file":
if any(item.name.endswith(ext) for ext in code_extensions):
try:
content = base64.b64decode(item.content).decode('utf-8')
code.append({
"path": item.path,
"content": content,
"language": self._detect_language(item.name),
"url": item.html_url
})
except Exception:
pass
return code
def _detect_language(self, filename: str) -> str:
"""Detect programming language from filename."""
ext_map = {
'.py': 'python',
'.js': 'javascript',
'.ts': 'typescript',
'.go': 'go',
'.rs': 'rust',
'.java': 'java',
'.rb': 'ruby',
'.sh': 'bash',
}
for ext, lang in ext_map.items():
if filename.endswith(ext):
return lang
return 'unknown'
def get_issues(
self,
repo_name: str,
state: str = "all",
limit: int = 100
) -> List[Dict]:
"""Extract issues with comments."""
repo = self.get_repo(repo_name)
issues = []
for issue in repo.get_issues(state=state)[:limit]:
issue_data = {
"number": issue.number,
"title": issue.title,
"body": issue.body or "",
"state": issue.state,
"labels": [l.name for l in issue.labels],
"created_at": issue.created_at.isoformat(),
"url": issue.html_url,
"comments": []
}
# Get comments
for comment in issue.get_comments():
issue_data["comments"].append({
"body": comment.body,
"author": comment.user.login,
"created_at": comment.created_at.isoformat()
})
issues.append(issue_data)
return issues
def get_discussions(self, repo_name: str, limit: int = 50) -> List[Dict]:
"""Extract discussions using GraphQL API."""
# Note: Requires GraphQL query, simplified version here
query = """
query($owner: String!, $name: String!, $first: Int!) {
repository(owner: $owner, name: $name) {
discussions(first: $first) {
nodes {
title
body
url
category { name }
comments(first: 10) {
nodes { body }
}
}
}
}
}
"""
owner, name = repo_name.split('/')
# Would need to execute GraphQL query
# Simplified: return empty for now
return []
def get_releases(self, repo_name: str, limit: int = 20) -> List[Dict]:
"""Extract release information."""
repo = self.get_repo(repo_name)
releases = []
for release in repo.get_releases()[:limit]:
releases.append({
"tag": release.tag_name,
"name": release.title,
"body": release.body or "",
"published_at": release.published_at.isoformat() if release.published_at else None,
"url": release.html_url,
"prerelease": release.prerelease
})
return releases
def get_repo_metadata(self, repo_name: str) -> Dict:
"""Get repository metadata."""
repo = self.get_repo(repo_name)
return {
"name": repo.name,
"full_name": repo.full_name,
"description": repo.description,
"topics": repo.get_topics(),
"language": repo.language,
"stars": repo.stargazers_count,
"forks": repo.forks_count,
"created_at": repo.created_at.isoformat(),
"updated_at": repo.updated_at.isoformat(),
"url": repo.html_url,
"homepage": repo.homepage
}
def chunk_readme(content: str) -> List[Dict]:
"""Chunk README by sections."""
import re
sections = []
current_section = {"heading": "Overview", "content": "", "level": 1}
for line in content.split('
'):
heading_match = re.match(r'^(#{1,3})\s+(.+)$', line)
if heading_match:
# Save current section
if current_section["content"].strip():
sections.append(current_section)
level = len(heading_match.group(1))
heading = heading_match.group(2)
current_section = {"heading": heading, "content": "", "level": level}
else:
current_section["content"] += line + "
"
# Don't forget last section
if current_section["content"].strip():
sections.append(current_section)
return sections
def chunk_code_file(content: str, language: str) -> List[Dict]:
"""Chunk code file by functions/classes."""
import ast
if language != 'python':
# For non-Python, chunk by size
return [{"content": content, "type": "file"}]
try:
tree = ast.parse(content)
except SyntaxError:
return [{"content": content, "type": "file"}]
chunks = []
for node in ast.iter_child_nodes(tree):
if isinstance(node, ast.FunctionDef):
source = ast.get_source_segment(content, node)
if source:
chunks.append({
"content": source,
"type": "function",
"name": node.name,
"docstring": ast.get_docstring(node)
})
elif isinstance(node, ast.ClassDef):
source = ast.get_source_segment(content, node)
if source:
chunks.append({
"content": source,
"type": "class",
"name": node.name,
"docstring": ast.get_docstring(node)
})
return chunks if chunks else [{"content": content, "type": "file"}]
def chunk_issue(issue: Dict) -> List[Dict]:
"""Chunk issue with comments."""
chunks = []
# Issue body as main chunk
chunks.append({
"content": f"# {issue['title']}
{issue['body']}",
"type": "issue",
"issue_number": issue["number"]
})
# Significant comments as separate chunks
for i, comment in enumerate(issue.get("comments", [])):
if len(comment["body"]) > 200: # Only substantial comments
chunks.append({
"content": comment["body"],
"type": "comment",
"issue_number": issue["number"],
"comment_index": i
})
return chunks
#!/usr/bin/env python3
"""Complete GitHub harvesting pipeline."""
from datetime import datetime
from typing import Dict, List
import hashlib
async def harvest_github_repo(
repo_name: str,
collection: str,
include_readme: bool = True,
include_docs: bool = True,
include_examples: bool = True,
include_issues: bool = False,
include_releases: bool = True,
max_issues: int = 50
) -> Dict:
"""
Harvest a GitHub repository into RAG.
Args:
repo_name: Repository in owner/repo format
collection: Target RAG collection
include_*: What content to harvest
max_issues: Maximum issues to harvest
"""
extractor = GitHubExtractor()
# Get repo metadata
repo_meta = extractor.get_repo_metadata(repo_name)
base_metadata = {
"source_type": "github",
"repo": repo_name,
"repo_description": repo_meta.get("description"),
"repo_language": repo_meta.get("language"),
"repo_topics": repo_meta.get("topics", []),
"harvested_at": datetime.now().isoformat()
}
stats = {
"readme": 0,
"docs": 0,
"examples": 0,
"issues": 0,
"releases": 0
}
# Harvest README
if include_readme:
readme = extractor.get_readme(repo_name)
if "content" in readme:
sections = chunk_readme(readme["content"])
for i, section in enumerate(sections):
metadata = {
**base_metadata,
"content_type": "readme",
"section": section["heading"],
"section_level": section["level"],
"chunk_index": i,
"source_url": readme["url"]
}
await ingest(
content=section["content"],
collection=collection,
metadata=metadata,
doc_id=f"gh_{repo_name.replace('/', '_')}_readme_{i}"
)
stats["readme"] += 1
# Harvest docs
if include_docs:
docs = extractor.get_docs(repo_name)
for doc in docs:
metadata = {
**base_metadata,
"content_type": "documentation",
"file_path": doc["path"],
"source_url": doc["url"]
}
await ingest(
content=doc["content"],
collection=collection,
metadata=metadata,
doc_id=f"gh_{repo_name.replace('/', '_')}_doc_{hashlib.md5(doc['path'].encode()).hexdigest()[:8]}"
)
stats["docs"] += 1
# Harvest code examples
if include_examples:
examples = extractor.get_code_examples(repo_name)
for example in examples:
chunks = chunk_code_file(example["content"], example["language"])
for i, chunk in enumerate(chunks):
metadata = {
**base_metadata,
"content_type": "code_example",
"file_path": example["path"],
"language": example["language"],
"code_type": chunk.get("type", "file"),
"code_name": chunk.get("name", ""),
"source_url": example["url"]
}
await ingest(
content=chunk["content"],
collection=collection,
metadata=metadata,
doc_id=f"gh_{repo_name.replace('/', '_')}_code_{hashlib.md5(example['path'].encode()).hexdigest()[:8]}_{i}"
)
stats["examples"] += 1
# Harvest issues
if include_issues:
issues = extractor.get_issues(repo_name, limit=max_issues)
for issue in issues:
chunks = chunk_issue(issue)
for chunk in chunks:
metadata = {
**base_metadata,
"content_type": chunk["type"],
"issue_number": issue["number"],
"issue_title": issue["title"],
"issue_state": issue["state"],
"issue_labels": issue["labels"],
"source_url": issue["url"]
}
await ingest(
content=chunk["content"],
collection=collection,
metadata=metadata,
doc_id=f"gh_{repo_name.replace('/', '_')}_issue_{issue['number']}_{chunk.get('comment_index', 0)}"
)
stats["issues"] += 1
# Harvest releases
if include_releases:
releases = extractor.get_releases(repo_name)
for release in releases:
if release["body"]: # Only if has release notes
metadata = {
**base_metadata,
"content_type": "release",
"release_tag": release["tag"],
"release_name": release["name"],
"published_at": release["published_at"],
"source_url": release["url"]
}
await ingest(
content=f"# {release['name']}
{release['body']}",
collection=collection,
metadata=metadata,
doc_id=f"gh_{repo_name.replace('/', '_')}_release_{release['tag']}"
)
stats["releases"] += 1
return {
"status": "success",
"repo": repo_name,
"collection": collection,
"harvested": stats,
"total": sum(stats.values())
}
# GitHub content metadata
source_type: github
repo: owner/repo
repo_description: "Repository description"
repo_language: Python
repo_topics: [topic1, topic2]
content_type: readme|documentation|code_example|issue|release
file_path: docs/guide.md (for docs/code)
language: python (for code)
code_type: function|class|file
code_name: function_name
issue_number: 123
issue_title: "Issue title"
issue_state: open|closed
issue_labels: [bug, help wanted]
release_tag: v1.0.0
source_url: https://github.com/...
harvested_at: "2024-01-01T12:00:00Z"
# Full repository harvest
result = await harvest_github_repo(
repo_name="anthropics/anthropic-sdk-python",
collection="anthropic_sdk",
include_readme=True,
include_docs=True,
include_examples=True,
include_issues=False,
include_releases=True
)
# Issues focus
result = await harvest_github_repo(
repo_name="langchain-ai/langchain",
collection="langchain_issues",
include_readme=False,
include_docs=False,
include_issues=True,
max_issues=200
)
# Code examples only
result = await harvest_github_repo(
repo_name="fastapi/fastapi",
collection="fastapi_examples",
include_readme=True,
include_docs=False,
include_examples=True
)
# Using gh CLI for quick extraction
gh repo clone owner/repo -- --depth 1
gh repo view owner/repo --json readme -q .readme.content
# Get issues as JSON
gh issue list --repo owner/repo --json title,body,comments --limit 50
Track improvements as you use this skill.
This skill should be used when the user asks to "create an agent", "add an agent", "write a subagent", "agent frontmatter", "when to use description", "agent examples", "agent tools", "agent colors", "autonomous agent", or needs guidance on agent structure, system prompts, triggering conditions, or agent development best practices for Claude Code plugins.
This skill should be used when the user asks to "create a slash command", "add a command", "write a custom command", "define command arguments", "use command frontmatter", "organize commands", "create command with file references", "interactive command", "use AskUserQuestion in command", or needs guidance on slash command structure, YAML frontmatter fields, dynamic arguments, bash execution in commands, user interaction patterns, or command development best practices for Claude Code.
This skill should be used when the user asks to "create a hook", "add a PreToolUse/PostToolUse/Stop hook", "validate tool use", "implement prompt-based hooks", "use ${CLAUDE_PLUGIN_ROOT}", "set up event-driven automation", "block dangerous commands", or mentions hook events (PreToolUse, PostToolUse, Stop, SubagentStop, SessionStart, SessionEnd, UserPromptSubmit, PreCompact, Notification). Provides comprehensive guidance for creating and implementing Claude Code plugin hooks with focus on advanced prompt-based hooks API.