Builds automated PII detection and redaction pipelines using spaCy NER, Microsoft Presidio, and AWS Macie. Handles confidence scoring, custom entities, batch workflows, and multi-format document scanning.
npx claudepluginhub mukul975/privacy-data-protection-skills --plugin privacy-skills-completeThis skill uses the workspace's default tool permissions.
Automated PII detection is a foundational capability for privacy engineering, enabling organizations to discover, classify, and protect personal data at scale. This skill covers building production-grade PII detection pipelines that combine rule-based pattern matching, machine learning-based Named Entity Recognition (NER), and cloud-native discovery services.
Conducts multi-round deep research on GitHub repos via API and web searches, generating markdown reports with executive summaries, timelines, metrics, and Mermaid diagrams.
Dynamically discovers and combines enabled skills into cohesive, unexpected delightful experiences like interactive HTML or themed artifacts. Activates on 'surprise me', inspiration, or boredom cues.
Generates images from structured JSON prompts via Python script execution. Supports reference images and aspect ratios for characters, scenes, products, visuals.
Automated PII detection is a foundational capability for privacy engineering, enabling organizations to discover, classify, and protect personal data at scale. This skill covers building production-grade PII detection pipelines that combine rule-based pattern matching, machine learning-based Named Entity Recognition (NER), and cloud-native discovery services.
| Entity Type | Examples | Detection Method | Risk Level |
|---|---|---|---|
| PERSON_NAME | "John Smith", "Maria Garcia" | NER model | High |
| EMAIL_ADDRESS | "j.smith@cipherengineeringlabs.com" | Regex pattern | High |
| PHONE_NUMBER | "+1-555-0123", "(555) 012-3456" | Regex + validation | High |
| SSN | "123-45-6789" | Regex + checksum | Critical |
| PASSPORT_NUMBER | "AB1234567" | Regex per country format | Critical |
| DRIVER_LICENSE | "D123-4567-8901" | Regex per state/country | Critical |
| CREDIT_CARD | "4111-1111-1111-1111" | Regex + Luhn checksum | Critical |
| IBAN | "GB82 WEST 1234 5698 7654 32" | Regex + modulo-97 check | High |
| IP_ADDRESS | "192.168.1.1", "2001:db8::1" | Regex (IPv4/IPv6) | Medium |
| MAC_ADDRESS | "00:1A:2B:3C:4D:5E" | Regex pattern | Medium |
| Entity Type | Examples | Detection Method | Risk Level |
|---|---|---|---|
| DATE_OF_BIRTH | "1990-01-15", "January 15, 1990" | NER + date parsing | Medium |
| POSTAL_CODE | "10001", "SW1A 1AA" | Regex per country | Medium |
| AGE | "35 years old", "age: 42" | NER + context | Low-Medium |
| GENDER | "male", "female", "non-binary" | Dictionary + context | Low |
| NATIONALITY | "British", "Japanese" | Dictionary + NER | Low-Medium |
| LOCATION | "123 Main St", "New York" | NER model | Medium |
| Entity Type | Examples | Detection Method | Risk Level |
|---|---|---|---|
| MEDICAL_RECORD | "MRN: 12345678" | Regex + context | Critical |
| HEALTH_CONDITION | "diabetes", "HIV positive" | Medical NER + dictionary | Critical |
| RELIGIOUS_BELIEF | "Muslim", "Catholic" | Dictionary + context | High |
| POLITICAL_OPINION | "Democratic Party member" | Dictionary + context | High |
| SEXUAL_ORIENTATION | "gay", "bisexual" | Dictionary + context | High |
| BIOMETRIC_DATA | "fingerprint hash: ..." | Context + pattern | Critical |
| GENETIC_DATA | "BRCA1 positive" | Medical dictionary | Critical |
Input Data --> Presidio Analyzer --> Detected Entities --> Presidio Anonymizer --> Redacted Output
| | |
v v v
+------------+ +-------------+ +---------------+
| Recognizers| | Score Filter | | Operators |
| - Pattern | | (threshold) | | - Replace |
| - NER | | | | - Redact |
| - Custom | | | | - Hash |
+------------+ +-------------+ | - Mask |
| - Encrypt |
+---------------+
"""
PII detection and redaction pipeline using Microsoft Presidio.
Supports structured and unstructured text with configurable
entity types, confidence thresholds, and redaction strategies.
"""
from presidio_analyzer import AnalyzerEngine, PatternRecognizer, Pattern
from presidio_analyzer.nlp_engine import NlpEngineProvider
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
import json
from dataclasses import dataclass, field
@dataclass
class DetectionResult:
entity_type: str
text: str
start: int
end: int
score: float
source: str
@dataclass
class PipelineConfig:
language: str = "en"
score_threshold: float = 0.5
entities_to_detect: list[str] = field(default_factory=lambda: [
"PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER", "CREDIT_CARD",
"US_SSN", "US_DRIVER_LICENSE", "IBAN_CODE", "IP_ADDRESS",
"LOCATION", "DATE_TIME", "NRP", "MEDICAL_LICENSE",
"US_PASSPORT", "US_BANK_NUMBER", "UK_NHS"
])
redaction_strategy: str = "replace" # replace, redact, hash, mask, encrypt
class PIIDetectionPipeline:
"""
Production PII detection pipeline built on Microsoft Presidio.
Supports custom entity types and configurable redaction strategies.
"""
def __init__(self, config: PipelineConfig):
self.config = config
# Initialize NLP engine with spaCy
nlp_config = {
"nlp_engine_name": "spacy",
"models": [{"lang_code": config.language, "model_name": "en_core_web_lg"}]
}
nlp_engine = NlpEngineProvider(nlp_configuration=nlp_config).create_engine()
# Initialize analyzer
self.analyzer = AnalyzerEngine(nlp_engine=nlp_engine)
# Register custom recognizers
self._register_custom_recognizers()
# Initialize anonymizer
self.anonymizer = AnonymizerEngine()
def _register_custom_recognizers(self):
"""Register custom PII recognizers beyond built-in types."""
# UK National Insurance Number
nino_pattern = Pattern(
name="uk_nino",
regex=r"\b[A-CEGHJ-PR-TW-Z]{2}\s?\d{2}\s?\d{2}\s?\d{2}\s?[A-D]\b",
score=0.85
)
nino_recognizer = PatternRecognizer(
supported_entity="UK_NINO",
patterns=[nino_pattern],
supported_language="en"
)
self.analyzer.registry.add_recognizer(nino_recognizer)
# Vehicle Registration Number (UK format)
vrn_pattern = Pattern(
name="uk_vrn",
regex=r"\b[A-Z]{2}\d{2}\s?[A-Z]{3}\b",
score=0.6
)
vrn_recognizer = PatternRecognizer(
supported_entity="UK_VRN",
patterns=[vrn_pattern],
supported_language="en"
)
self.analyzer.registry.add_recognizer(vrn_recognizer)
# Employee ID (Cipher Engineering Labs format: CEL-XXXXX)
emp_id_pattern = Pattern(
name="employee_id",
regex=r"\bCEL-\d{5}\b",
score=0.95
)
emp_id_recognizer = PatternRecognizer(
supported_entity="EMPLOYEE_ID",
patterns=[emp_id_pattern],
supported_language="en"
)
self.analyzer.registry.add_recognizer(emp_id_recognizer)
def detect(self, text: str) -> list[DetectionResult]:
"""
Detect PII entities in text.
Args:
text: Input text to scan
Returns:
List of detected PII entities with confidence scores
"""
results = self.analyzer.analyze(
text=text,
entities=self.config.entities_to_detect,
language=self.config.language,
score_threshold=self.config.score_threshold
)
return [
DetectionResult(
entity_type=r.entity_type,
text=text[r.start:r.end],
start=r.start,
end=r.end,
score=r.score,
source=r.analysis_explanation.recognizer if r.analysis_explanation else "unknown"
)
for r in results
]
def redact(self, text: str) -> tuple[str, list[DetectionResult]]:
"""
Detect and redact PII from text.
Returns:
Tuple of (redacted_text, list_of_detections)
"""
# Detect entities
analyzer_results = self.analyzer.analyze(
text=text,
entities=self.config.entities_to_detect,
language=self.config.language,
score_threshold=self.config.score_threshold
)
# Configure redaction operator
operators = self._get_operators()
# Apply redaction
anonymized = self.anonymizer.anonymize(
text=text,
analyzer_results=analyzer_results,
operators=operators
)
detections = [
DetectionResult(
entity_type=r.entity_type,
text=text[r.start:r.end],
start=r.start,
end=r.end,
score=r.score,
source="presidio"
)
for r in analyzer_results
]
return anonymized.text, detections
def _get_operators(self) -> dict:
"""Configure anonymization operators based on strategy."""
if self.config.redaction_strategy == "replace":
return {"DEFAULT": OperatorConfig("replace", {"new_value": "<REDACTED>"})}
elif self.config.redaction_strategy == "hash":
return {"DEFAULT": OperatorConfig("hash", {"hash_type": "sha256"})}
elif self.config.redaction_strategy == "mask":
return {"DEFAULT": OperatorConfig("mask", {
"type": "mask",
"masking_char": "*",
"chars_to_mask": 100,
"from_end": False
})}
elif self.config.redaction_strategy == "redact":
return {"DEFAULT": OperatorConfig("redact", {})}
else:
return {"DEFAULT": OperatorConfig("replace", {"new_value": "<REDACTED>"})}
class BatchPIIProcessor:
"""
Process large volumes of documents for PII detection.
Supports structured (CSV/JSON) and unstructured (text) formats.
"""
def __init__(self, pipeline: PIIDetectionPipeline):
self.pipeline = pipeline
def process_csv(
self,
input_path: str,
output_path: str,
columns_to_scan: list[str] = None
) -> dict:
"""
Scan and redact PII in a CSV file.
Args:
input_path: Path to input CSV
output_path: Path to write redacted CSV
columns_to_scan: Columns to scan (all if None)
Returns:
Summary statistics of detections
"""
import pandas as pd
df = pd.read_csv(input_path)
stats = {"total_rows": len(df), "detections": {}, "columns_scanned": []}
scan_columns = columns_to_scan or df.columns.tolist()
for col in scan_columns:
if col not in df.columns:
continue
stats["columns_scanned"].append(col)
col_detections = []
for idx, value in df[col].items():
if pd.isna(value):
continue
text = str(value)
redacted, detections = self.pipeline.redact(text)
df.at[idx, col] = redacted
for d in detections:
col_detections.append(d.entity_type)
# Count detection types for this column
for entity_type in set(col_detections):
count = col_detections.count(entity_type)
key = f"{col}:{entity_type}"
stats["detections"][key] = count
df.to_csv(output_path, index=False)
stats["total_detections"] = sum(stats["detections"].values())
return stats
def process_text_files(
self,
file_paths: list[str],
output_dir: str
) -> dict:
"""
Batch process text files for PII detection and redaction.
Returns summary statistics.
"""
import os
stats = {
"files_processed": 0,
"total_detections": 0,
"entity_counts": {},
"high_risk_files": []
}
for file_path in file_paths:
with open(file_path, "r", encoding="utf-8") as f:
text = f.read()
redacted, detections = self.pipeline.redact(text)
# Write redacted output
output_path = os.path.join(output_dir, os.path.basename(file_path))
with open(output_path, "w", encoding="utf-8") as f:
f.write(redacted)
# Update statistics
stats["files_processed"] += 1
stats["total_detections"] += len(detections)
for d in detections:
stats["entity_counts"][d.entity_type] = (
stats["entity_counts"].get(d.entity_type, 0) + 1
)
# Flag high-risk files (contain critical PII)
critical_types = {"US_SSN", "CREDIT_CARD", "MEDICAL_LICENSE", "UK_NHS"}
if any(d.entity_type in critical_types for d in detections):
stats["high_risk_files"].append(file_path)
return stats
"""
Train custom spaCy NER model for domain-specific PII entities.
"""
import spacy
from spacy.training import Example
import random
def create_training_data() -> list[tuple[str, dict]]:
"""
Create training examples for custom PII entity types.
Returns list of (text, annotations) tuples.
"""
training_data = [
(
"Patient MRN 12345678 was admitted on 2024-01-15",
{"entities": [(12, 20, "MEDICAL_RECORD_NUMBER")]}
),
(
"Employee CEL-00142 reported the incident",
{"entities": [(9, 18, "EMPLOYEE_ID")]}
),
(
"Policy holder number PLH-2024-99887 filed a claim",
{"entities": [(22, 35, "POLICY_NUMBER")]}
),
(
"The customer with loyalty ID LYL-A1B2C3 requested data export",
{"entities": [(29, 39, "LOYALTY_ID")]}
),
]
return training_data
def train_custom_ner(
base_model: str = "en_core_web_lg",
training_data: list = None,
n_iter: int = 30,
output_dir: str = "./custom_ner_model"
):
"""
Fine-tune spaCy NER model with custom PII entity types.
Args:
base_model: Base spaCy model to fine-tune
training_data: List of (text, annotations) tuples
n_iter: Number of training iterations
output_dir: Directory to save trained model
"""
if training_data is None:
training_data = create_training_data()
nlp = spacy.load(base_model)
# Get or create NER pipe
if "ner" not in nlp.pipe_names:
ner = nlp.add_pipe("ner", last=True)
else:
ner = nlp.get_pipe("ner")
# Add custom entity labels
custom_labels = set()
for _, annotations in training_data:
for ent in annotations.get("entities", []):
custom_labels.add(ent[2])
for label in custom_labels:
ner.add_label(label)
# Train
optimizer = nlp.resume_training()
other_pipes = [pipe for pipe in nlp.pipe_names if pipe != "ner"]
with nlp.disable_pipes(*other_pipes):
for iteration in range(n_iter):
random.shuffle(training_data)
losses = {}
for text, annotations in training_data:
doc = nlp.make_doc(text)
example = Example.from_dict(doc, annotations)
nlp.update([example], drop=0.35, sgd=optimizer, losses=losses)
if iteration % 10 == 0:
print(f"Iteration {iteration}, Losses: {losses}")
nlp.to_disk(output_dir)
return nlp
S3 Buckets --> Macie Classification Jobs --> Findings --> EventBridge --> Lambda
| |
v v
Security Hub Remediation
(centralized) - Tag sensitive
- Encrypt
- Notify owner
- Quarantine
"""
AWS Macie integration for cloud-native PII detection in S3 buckets.
"""
import boto3
from datetime import datetime
class MacieIntegration:
"""
Configure and manage AWS Macie classification jobs for
automated PII detection across S3 data stores.
"""
def __init__(self, region: str = "us-east-1"):
self.macie_client = boto3.client("macie2", region_name=region)
def create_classification_job(
self,
bucket_name: str,
job_name: str,
custom_data_identifiers: list[str] = None,
schedule: str = "ONE_TIME"
) -> str:
"""
Create a Macie classification job for an S3 bucket.
Args:
bucket_name: Target S3 bucket
job_name: Descriptive job name
custom_data_identifiers: IDs of custom data identifier resources
schedule: ONE_TIME or SCHEDULED
Returns:
Job ID
"""
job_config = {
"name": job_name,
"description": f"PII detection scan for {bucket_name}",
"jobType": schedule,
"s3JobDefinition": {
"bucketDefinitions": [
{
"accountId": self._get_account_id(),
"buckets": [bucket_name]
}
],
"scoping": {
"includes": {
"and": [
{
"simpleScopeTerm": {
"comparator": "STARTS_WITH",
"key": "OBJECT_EXTENSION",
"values": ["csv", "json", "txt", "pdf", "docx", "xlsx", "parquet"]
}
}
]
}
}
},
"managedDataIdentifierSelector": "ALL",
"tags": {
"Team": "Privacy Engineering",
"Purpose": "PII Detection"
}
}
if custom_data_identifiers:
job_config["customDataIdentifierIds"] = custom_data_identifiers
response = self.macie_client.create_classification_job(**job_config)
return response["jobId"]
def create_custom_data_identifier(
self,
name: str,
description: str,
regex: str,
keywords: list[str] = None,
maximum_match_distance: int = 50
) -> str:
"""
Create a custom data identifier for organization-specific PII patterns.
Returns:
Custom data identifier ID
"""
params = {
"name": name,
"description": description,
"regex": regex,
"maximumMatchDistance": maximum_match_distance
}
if keywords:
params["keywords"] = keywords
response = self.macie_client.create_custom_data_identifier(**params)
return response["customDataIdentifierId"]
def _get_account_id(self) -> str:
"""Get the current AWS account ID."""
sts = boto3.client("sts")
return sts.get_caller_identity()["Account"]
def get_findings_summary(self, job_id: str) -> dict:
"""Get summary of findings from a classification job."""
response = self.macie_client.list_findings(
findingCriteria={
"criterion": {
"classificationDetails.jobId": {
"eq": [job_id]
}
}
}
)
findings = []
if response["findingIds"]:
details = self.macie_client.get_findings(findingIds=response["findingIds"])
findings = details["findings"]
summary = {
"total_findings": len(findings),
"severity_counts": {},
"entity_type_counts": {},
"affected_objects": []
}
for finding in findings:
severity = finding.get("severity", {}).get("description", "unknown")
summary["severity_counts"][severity] = (
summary["severity_counts"].get(severity, 0) + 1
)
sensitive_data = finding.get("classificationDetails", {}).get(
"result", {}
).get("sensitiveData", [])
for sd in sensitive_data:
category = sd.get("category", "unknown")
summary["entity_type_counts"][category] = (
summary["entity_type_counts"].get(category, 0)
+ sd.get("totalCount", 0)
)
resource = finding.get("resourcesAffected", {}).get("s3Object", {})
if resource:
summary["affected_objects"].append(resource.get("key", "unknown"))
return summary
| Score Range | Confidence Level | Recommended Action |
|---|---|---|
| 0.95 - 1.00 | Very High | Auto-redact |
| 0.80 - 0.94 | High | Auto-redact with logging |
| 0.60 - 0.79 | Medium | Flag for human review |
| 0.40 - 0.59 | Low | Log only, no action |
| 0.00 - 0.39 | Very Low | Ignore |