📦 senior-data-engineer — 高级数据工程师

v0.1.0

面向国内开发者的高级数据工程技能,专注于构建可扩展的数据管道、ETL/ELT系统、实时流处理和数据基础设施。涵盖Python、SQL、Spark、Airflow、dbt、Kafka、Flink、Kinesis及现代数据栈,包含数据建模、管道编排、数据质量、流处理质量监控和DataOps。适用于设计数据架构、构建批处理或流处理数据管道、优化数据工作流及实施数据治理。

0· 31·0 当前·0 累计
wu-uk 头像by @wu-uk·MIT-0
下载技能包
License
MIT-0
最后更新
2026/4/15
0
安全扫描
VirusTotal
无害
查看报告
OpenClaw
可疑
medium confidence
该技能的文件和指令总体上符合高级数据工程师的用途,但存在多个不一致和缺失之处(CLI调用模式损坏、未声明的重型依赖、版本/验证不匹配),使得该包可疑,建议使用前手动审查。
评估建议
该包总体上符合其声明的用途(数据工程工具),但在安装或运行前有几个实际和安全问题需要解决:- 在完全审查之前,不要在生产环境或使用真实凭证运行脚本。示例展示了DB和云凭证(Postgres、Snowflake、AWS、Kafka)的使用,但技能元数据未声明或保护这些密钥。- 仓库缺少脚本所需的许多第三方Python包的安装规范。请重建受控虚拟环境、固定版本并在安装前审计包。- 几个脚本和CLI入口点看起来不一致/有bug(类的构造函数和方法名与main()调用方式不匹配)。预期会出现运行时错误;在sandbox中审查和测试。- 版本/验证不一致(注册表版本0.1.0 vs SKILL.md v2.0.0 vs HOW_TO_USE 1.0.0,来源未知但'verified: true')令人可疑——请发布者提供来源或首选受信任仓库的受控发布版本。- 审查任何被截断/省略的文件(大型stream_processor/streaming_quality_validator文件在清单中被截断)是否有网络调用或隐藏端点后再使用。建议操作:在隔离环境中运行代码,避免提供真实凭证(使用mock/t...
详细分析 ▾
用途与能力
名称、描述、SKILL.md和包含的脚本(pipeline_orchestrator、kafka_config_generator、data_quality_validator、stream_processor等)与高级数据工程技能一致。然而,该包未声明所需的环境变量或安装步骤,而代码和文档展示了大量外部集成(Airflow、Snowflake、S3、Kafka、dbt、Great Expectations),这些在实际中需要凭证和第三方库。此外,注册表元数据/版本(0.1.0)与SKILL.md版本(v2.0.0)和HOW_TO_USE.md(1.0.0)不同,这是不一致的。
指令范围
SKILL.md和参考/模板/文档指导运行连接数据库、S3、Snowflake、Kafka、Airflow变量和其他系统的脚本。指令假设凭证和运行时系统可用,但技能元数据未声明所需的环境变量或配置路径。几个示例片段嵌入了凭证占位符(例如SQL中的AWS_KEY_ID/AWS_SECRET_KEY),并引用Airflow Variable访问(context['var']['value'])和Airflow provider operators。因此,运行时指令需要访问技能未声明的密钥和基础设施,可能导致用户将凭证粘贴到模板中。此外,一些脚本和示例似乎被截断或使用Airflow特定上下文而未明确说明运行时环境要求。
安装机制
这是一个纯指令技能(无安装规范)。但文档和脚本列出了许多重量级Python依赖(apache-airflow、pandas、sqlalchemy、boto3、google-cloud-bigquery、snowflake-connector-python、great-expectations、scikit-learn等),而清单中没有打包的安装说明。这种不匹配意味着用户或agent可能尝试在未安装依赖的情况下运行脚本,或尝试临时安装。缺少这些依赖的声明性、可重现安装机制是可用性和安全性的红旗。
凭证需求
注册表未列出所需的环境变量或凭证,但代码和示例明确需要数据库连接字符串、云凭证和服务令牌(Postgres/Snowflake/S3/Kafka)。考虑到代码预期的重度外部访问,技能未声明主要凭证或所需的环境条目是不成比例的。这个差距意味着agent或用户需要在运行时提供密钥;技能还包含鼓励在代码/模板中嵌入凭证的示例(例如Snowflake CREATE STAGE示例)。
持久化与权限
该技能不请求always:true,不修改其他技能,也没有改变agent范围配置的安装步骤。它是用户可调用的并允许自主调用(平台默认),这是预期的。除了正常执行包含的脚本外,不请求持久权限。
安全有层次,运行前请审查代码。

License

MIT-0

可自由使用、修改和再分发,无需署名。

运行时依赖

无特殊依赖

版本

latestv0.1.02026/4/15

从all-task-skills-dedup批量发布

无害

安装命令

点击复制
官方npx clawhub@latest install flink-query-senior-data-engineer
镜像加速npx clawhub@latest install flink-query-senior-data-engineer --registry https://cn.longxiaskill.com

技能文档

核心能力

  • 批处理管道编排 - 使用Airflow设计并实施生产级ETL/ELT管道,具备智能依赖解析、重试逻辑和全面监控
  • 实时流处理 - 使用Kafka、Flink、Kinesis和Spark Streaming构建事件驱动的流处理管道,具备exactly-once语义和亚秒级延迟
  • 数据质量管理 - 涵盖完整性、准确性、一致性、及时性和有效性的全面批处理和流处理数据质量验证
  • 流处理质量监控 - 跟踪流处理管道的消费者延迟、数据新鲜度、模式漂移、吞吐量和死信队列速率
  • 性能优化 - 通过查询优化、Spark调优和成本分析建议分析和优化管道性能

关键工作流

工作流1:构建ETL管道

时间: 2-4小时

步骤:

  • 使用Lambda、Kappa或Medallion模式设计管道架构
  • 使用YAML配置管道定义(源、转换、目标)
  • 使用pipeline_orchestrator.py生成Airflow DAG
  • 定义数据质量验证规则
  • 部署并配置监控/告警

预期输出: 生产级ETL管道,成功率99%+,自动化质量检查和全面监控

工作流2:构建实时流处理管道

时间: 3-5天

步骤:

  • 根据需求选择流处理架构(Kappa vs Lambda)
  • 配置流处理管道YAML(源、处理、汇、质量)
  • 使用kafka_config_generator.py生成Kafka配置
  • 使用stream_processor.py生成Flink/Spark作业脚手架
  • 使用streaming_quality_validator.py部署和监控

预期输出: 处理10K+事件/秒的流处理管道,P99延迟<1s,exactly-once传递和实时质量监控

面向生产级数据系统、可扩展管道和企业数据平台的世界级数据工程。

概述

本技能通过高级生产模式提供全面的数据工程基础专业知识。从设计Medallion架构到实施实时流处理管道,它涵盖现代数据工程的全部范围,包括ETL/ELT设计、数据质量框架、管道编排和DataOps实践。

本技能提供:

  • 生产级管道模板(Airflow、Spark、dbt)
  • 全面的数据质量验证框架
  • 性能优化和成本分析工具
  • 数据架构模式(Lambda、Kappa、Medallion)
  • 完整的DataOps CI/CD工作流

最适合:

  • 为企业系统构建可扩展数据管道
  • 实施数据质量和治理框架
  • 优化ETL性能和云成本
  • 设计现代数据架构(数据湖、数据仓库、湖仓一体)
  • 生产ML/AI数据基础设施

快速开始

管道编排

# 从配置生成Airflow DAG
python scripts/pipeline_orchestrator.py --config pipeline_config.yaml --output dags/

# 验证管道配置 python scripts/pipeline_orchestrator.py --config pipeline_config.yaml --validate

# 使用增量加载模板 python scripts/pipeline_orchestrator.py --template incremental --output dags/

数据质量验证

# 使用质量检查验证CSV文件
python scripts/data_quality_validator.py --input data/sales.csv --output report.html

# 使用自定义规则验证数据库表 python scripts/data_quality_validator.py \ --connection postgresql://user:pass@host/db \ --table sales_transactions \ --rules rules/sales_validation.yaml \ --threshold 0.95

性能优化

# 分析管道性能并获取建议
python scripts/etl_performance_optimizer.py \
  --airflow-db postgresql://host/airflow \
  --dag-id sales_etl_pipeline \
  --days 30 \
  --optimize

# 分析Spark作业性能 python scripts/etl_performance_optimizer.py \ --spark-history-server http://spark-history:18080 \ --app-id app-20250115-001

实时流处理

# 验证流处理管道配置
python scripts/stream_processor.py --config streaming_config.yaml --validate

# 生成Kafka主题和客户端配置 python scripts/kafka_config_generator.py \ --topic user-events \ --partitions 12 \ --replication 3 \ --output kafka/topics/

# 生成exactly-once生产者配置 python scripts/kafka_config_generator.py \ --producer \ --profile exactly-once \ --output kafka/producer.properties

# 生成Flink作业脚手架 python scripts/stream_processor.py \ --config streaming_config.yaml \ --mode flink \ --generate \ --output flink-jobs/

# 监控流处理质量 python scripts/streaming_quality_validator.py \ --lag --consumer-group events-processor --threshold 10000 \ --freshness --topic processed-events --max-latency-ms 5000 \ --output streaming-health-report.html

核心工作流

1. 构建生产数据管道

步骤:

  • 设计架构: 根据需求选择模式(Lambda、Kappa、Medallion)
  • 配置管道: 创建包含源、转换、目标的YAML配置
  • 生成DAG: python scripts/pipeline_orchestrator.py --config config.yaml
  • 添加质量检查: 定义数据质量验证规则
  • 部署和监控: 部署到Airflow,配置告警,跟踪指标

管道模式: 参见frameworks.md了解Lambda架构、Kappa架构、Medallion架构(Bronze/Silver/Gold)和微服务数据模式。

模板: 参见templates.md获取完整的Airflow DAG模板、Spark作业模板、dbt模型和Docker配置。

2. 数据质量管理

步骤:

  • 定义规则: 创建涵盖完整性、准确性、一致性的验证规则
  • 运行验证: python scripts/data_quality_validator.py --rules rules.yaml
  • 审查结果: 分析质量分数和失败的检查
  • 集成CI/CD: 将验证添加到管道部署流程
  • 监控趋势: 随时间跟踪质量分数

质量框架: 参见frameworks.md获取涵盖所有维度(完整性、准确性、一致性、及时性、有效性)的完整数据质量框架。

验证模板: 参见templates.md获取验证配置示例和Python API用法。

3. 数据建模与转换

步骤:

  • 选择建模方法: 维度建模(Kimball)、Data Vault 2.0或One Big Table
  • 设计模式: 定义事实表、维度和关系
  • 使用dbt实现: 创建暂存、中间和集市模型
  • 处理SCD: 实现缓慢变化维度逻辑(Type 1/2/3)
  • 测试和部署: 运行dbt测试,生成文档,部署

建模模式: 参见frameworks.md获取维度建模(Kimball)、Data Vault 2.0、One Big Table(OBT)和SCD实现。

dbt模板: 参见templates.md获取完整的dbt模型模板,包括暂存、中间、事实表和SCD Type 2逻辑。

4. 性能优化

步骤:

  • 分析管道: 对最近的管道执行运行性能分析器
  • 识别瓶颈: 审查执行时间分解和慢任务
  • 应用优化: 实施建议(分区、索引、批处理)
  • 调优Spark作业: 优化内存、并行度和shuffle设置
  • 衡量影响: 比较前后的指标,跟踪成本节省

优化策略: 参见frameworks.md获取性能最佳实践,包括分区策略、查询优化和Spark调优。

分析工具: 参见tools.md获取关于etl_performance_optimizer.py的完整文档,包括查询分析和Spark调优。

5. 构建实时流处理管道

步骤:

  • 架构选择: 选择Kappa(仅流处理)或Lambda(批处理+流处理)架构
  • 配置管道: 创建包含源、处理引擎、汇、质量阈值的YAML配置
  • 生成Kafka配置: python scripts/kafka_config_generator.py --topic events --partitions 12
  • 生成作业脚手架: python scripts/stream_processor.py --mode flink --generate
  • 部署基础设施: 使用Docker Compose进行本地开发,Kubernetes用于生产
  • 监控质量: python scripts/streaming_quality_validator.py --lag --freshness --throughput

流处理模式: 参见frameworks.md获取状态处理、流连接、窗口化、exactly-once语义和CDC模式。

模板: 参见templates.md获取Flink DataStream作业、Kafka Streams应用程序、PyFlink模板和Docker Compose配置。

Python工具

pipeline_orchestrator.py

具有智能依赖解析和监控的自动化Airflow DAG生成。

主要功能:

  • 从YAML配置生成生产级DAG
  • 自动任务依赖解析
  • 内置重试逻辑和错误处理
  • 多源支持(PostgreSQL、S3、BigQuery、Snowflake)
  • 集成质量检查和告警

用法:

# 基本DAG生成
python scripts/pipeline_orchestrator.py --config pipeline_config.yaml --output dags/

# 带验证 python scripts/pipeline_orchestrator.py --config config.yaml --validate

# 从模板 python scripts/pipeline_orchestrator.py --template incremental --output dags/

完整文档: 参见tools.md获取完整的配置选项、模板和集成示例。

data_quality_validator.py

具有自动化检查和报告的全面数据质量验证框架。

功能:

  • 多维度验证(完整性、准确性、一致性、及时性、有效性)
  • Great Expectations集成
  • 自定义业务规则验证
  • HTML/PDF报告生成
  • 异常检测
  • 历史趋势跟踪

用法:

# 使用自定义规则验证
python scripts/data_quality_validator.py \
  --input data/sales.csv \
  --rules rules/sales_validation.yaml \
  --output report.html

# 数据库表验证 python scripts/data_quality_validator.py \ --connection postgresql://host/db \ --table sales_transactions \ --threshold 0.95

完整文档: 参见tools.md获取规则配置、API用法和集成模式。

etl_performance_optimizer.py

具有可操作优化建议的管道性能分析。

功能:

  • Airflow DAG执行分析
  • 瓶颈检测和分析
  • SQL查询优化建议
  • Spark作业调优建议
  • 成本分析和优化
  • 历史性能趋势

用法:

# 分析Airflow DAG
python scripts/etl_performance_optimizer.py \
  --airflow-db postgresql://host/airflow \
  --dag-id sales_etl_pipeline \
  --days 30 \
  --optimize

# Spark作业分析 python scripts/etl_performance_optimizer.py \ --spark-history-server http://spark-history:18080 \ --app-id app-20250115-001

完整文档: 参见tools.md获取分析选项、优化策略和成本分析。

stream_processor.py

用于Kafka、Flink和Kinesis的流处理管道配置生成器和验证器。

功能:

  • 多平台支持(Kafka、Flink、Kinesis、Spark Streaming)
  • 带有最佳实践检查的配置验证
  • Flink/Spark作业脚手架生成
  • Kafka主题配置生成
  • 用于本地流处理栈的Docker Compose
  • exactly-once语义配置

用法:

# 验证配置
python scripts/stream_processor.py --config streaming_config.yaml --validate

# 生成Kafka配置 python scripts/stream_processor.py --config streaming_config.yaml --mode kafka --generate

# 生成Flink作业脚手架 python scripts/stream_processor.py --config streaming_config.yaml --mode flink --generate --output flink-jobs/

# 生成Docker Compose用于本地开发 python scripts/stream_processor.py --config streaming_config.yaml --mode docker --generate

完整文档: 参见tools.md获取配置格式、验证检查和生成的输出。

streaming_quality_validator.py

具有全面健康评分的实时流处理数据质量监控。

功能:

  • 带阈值的消费者延迟监控
  • 数据新鲜度验证(P50/P95/P99延迟)
  • 模式漂移检测
  • 吞吐量分析(事件/秒、字节/秒)
  • 死信队列速率监控
  • 带有建议的整体质量评分
  • Prometheus指标导出

用法:

# 监控消费者延迟
python scripts/streaming_quality_validator.py \
  --lag --consumer-group events-processor --threshold 10000

# 监控数据新鲜度 python scripts/streaming_quality_validator.py \ --freshness --topic processed-events --max-latency-ms 5000

# 完整质量验证 python scripts/streaming_quality_validator.py \ --lag --freshness --throughput --dlq \ --output streaming-health-report.html

完整文档: 参见tools.md获取所有监控维度和集成模式。

kafka_config_generator.py

具有性能和安全性配置的生产级Kafka配置生成器。

功能:

  • 主题配置(分区、复制、保留、压缩)
  • 生产者配置(高吞吐量、exactly-once、低延迟、有序)
  • 消费者配置(exactly-once、高吞吐量、批处理)
  • 带状态存储调优的Kafka Streams配置
  • 安全性配置(SASL-PLAIN、SASL-SCRAM、mTLS)
  • Kafka Connect源/汇配置
  • 多种输出格式(properties、YAML、JSON)

用法:

# 生成主题配置
python scripts/kafka_config_generator.py \
  --topic user-events --partitions 12 --replication 3 --retention-hours 168

# 生成exactly-once生产者 python scripts/kafka_config_generator.py \ --producer --profile exactly-once --transactional-id producer-001

# 生成Kafka Streams配置 python scripts/kafka_config_generator.py \ --streams --application-id events-processor --exactly-once

完整文档: 参见tools.md获取所有配置、安全性选项和Connect配置。

参考文档

框架 (frameworks.md)

全面的数据工程框架和模式:

  • 架构模式: Lambda、Kappa、Medallion、微服务数据架构
  • 数据建模: 维度建模(Kimball)、Data Vault 2.0、One Big Table
  • ETL/ELT模式: 全量加载、增量加载、CDC、SCD、幂等管道
  • 数据质量: 涵盖所有质量维度的完整框架
  • DataOps: 数据管道的CI/CD、测试策略、监控
  • 编排: Airflow DAG模式、回填策略
  • 实时流处理: 状态处理、流连接、窗口化策略、exactly-once语义、事件时间处理、水印、反压、Apache Flink模式、AWS Kinesis模式、CDC用于流处理
  • 治理: 数据目录、血缘追踪、访问控制

模板 (templates.md)

生产级代码模板和示例:

  • Airflow DAG: 完整ETL DAG、增量加载、动态任务生成
  • Spark作业: 批处理、流处理、优化配置
  • dbt模型: 暂存、中间、事实表、带有SCD Type 2的维度
  • SQL模式: 增量合并(upsert)、去重、日期骨架、窗口函数
  • Python管道: 数据质量验证类、重试装饰器、错误处理
  • 实时流处理: Apache Flink DataStream作业(Java)、Kafka Streams应用程序、PyFlink作业、AWS Kinesis消费者、用于流处理栈的Docker Compose
  • Kafka配置: 生产者/消费者属性模板、主题配置、安全性配置
  • Docker: 数据管道的Dockerfile、用于本地开发的Docker Compose,包括流处理栈(Kafka、Flink、Schema Registry)
  • 配置: dbt项目配置、Spark配置、Airflow变量、流处理管道YAML
  • 测试: pytest fixtures、集成测试、数据质量测试

工具 (tools.md)

Python自动化工具文档:

  • pipeline_orchestrator.py: 完整使用指南、配置格式、DAG模板
  • data_quality_validator.py: 验证规则、维度检查、Great Expectations集成
  • etl_performance_optimizer.py: 性能分析、查询优化、Spark调优
  • stream_processor.py: 流处理管道配置、验证、作业脚手架生成
  • streaming_quality_validator.py: 消费者延迟、数据新鲜度、模式漂移、吞吐量监控
  • kafka_config_generator.py: 主题、生产者、消费者、Kafka Streams和Connect配置
  • 集成模式: Airflow、dbt、CI/CD、监控系统、Prometheus
  • 最佳实践: 配置管理、错误处理、性能、监控、流处理质量

技术栈

核心技术:

  • 语言: Python 3.8+、SQL、Scala(Spark)、Java(Flink)
  • 编排: Apache Airflow、Prefect、Dagster
  • 批处理: Apache Spark、dbt、Pandas
  • 流处理: Apache Kafka、Apache Flink、Kafka Streams、Spark Structured Streaming、AWS Kinesis
  • 存储: PostgreSQL、BigQuery、Snowflake、Redshift、S3、GCS
  • 模式管理: Confluent Schema Registry、AWS Glue Schema Registry
  • 容器化: Docker、Kubernetes
  • 监控: Datadog、Prometheus、Grafana、Kafka UI

数据平台:

  • 云数据仓库: Snowflake、BigQuery、Redshift
  • 数据湖: Delta Lake、Apache Iceberg、Apache Hudi
  • 流处理平台: Apache Kafka、AWS Kinesis、Google Pub/Sub、Azure Event Hubs
  • 流处理引擎: Apache Flink、Kafka Streams、Spark Structured Streaming
  • 工作流: Airflow、Prefect、Dagster

集成点

本技能与以下集成:

  • 编排: Airflow、Prefect、Dagster用于工作流管理
  • 转换: dbt用于SQL转换和测试
  • 质量: Great Expectations用于数据验证
  • 监控: Datadog、Prometheus用于管道监控
  • BI工具: Looker、Tableau、Power BI用于分析
  • ML平台: MLflow、Kubeflow用于ML管道集成
  • 版本控制: Git用于管道代码和配置

参见tools.md获取详细的集成模式和示例。

最佳实践

管道设计:

  • 幂等操作以确保安全重运行
  • 尽可能使用增量处理
  • 清晰的数据血缘和文档
  • 全面的错误处理
  • 自动化恢复机制

数据质量:

  • 尽早定义质量规则
  • 在每个管道阶段验证
  • 自动化质量监控
  • 随时间跟踪质量趋势
  • 阻止坏数据进入下游

性能:

  • 按日期/区域对大表进行分区
  • 使用列式格式(Parquet、ORC)
  • 利用谓词下推
  • 针对查询模式进行优化
  • 定期监控和调优

运维:

  • 对所有内容进行版本控制
  • 自动化测试和部署
  • 实施全面监控
  • 为事件记录运维手册
  • 定期进行性能审查

性能目标

批处理管道执行:

  • P50延迟:<5分钟(每小时管道)
  • P95延迟:<15分钟
  • 成功率:>99%
  • 数据新鲜度:<1小时落后于源

流处理管道执行:

  • 吞吐量:10K+事件/秒持续
  • 端到端延迟:P99<1秒
  • 消费者延迟:<10K条记录落后
  • exactly-once传递:零重复或丢失

数据质量(批处理):

  • 质量分数:>95%
  • 完整性:>99%
  • 及时性:<2小时数据延迟
  • 零关键故障

流处理质量:

  • 数据新鲜度:P95<5分钟从事件生成
  • 延迟数据率:<5%在水印窗口外
  • 死信队列速率:<1%
  • 模式兼容性:100%向后/向前兼容变更

成本效率:

  • 每GB处理成本:<$0.10
  • 云成本趋势:稳定或下降
  • 资源利用率:>70%

资源


版本: 2.0.0

最后更新: 2025年12月16日

文档结构: 渐进式披露与全面参考

流处理增强: 任务#8 - 添加了实时流处理能力

数据来源ClawHub ↗ · 中文优化:龙虾技能库