详细分析 ▾
运行时依赖
版本
从all-task-skills-dedup批量发布
安装命令
点击复制技能文档
核心能力
- 批处理管道编排 - 使用Airflow设计并实施生产级ETL/ELT管道,具备智能依赖解析、重试逻辑和全面监控
- 实时流处理 - 使用Kafka、Flink、Kinesis和Spark Streaming构建事件驱动的流处理管道,具备exactly-once语义和亚秒级延迟
- 数据质量管理 - 涵盖完整性、准确性、一致性、及时性和有效性的全面批处理和流处理数据质量验证
- 流处理质量监控 - 跟踪流处理管道的消费者延迟、数据新鲜度、模式漂移、吞吐量和死信队列速率
- 性能优化 - 通过查询优化、Spark调优和成本分析建议分析和优化管道性能
关键工作流
工作流1:构建ETL管道
时间: 2-4小时
步骤:
- 使用Lambda、Kappa或Medallion模式设计管道架构
- 使用YAML配置管道定义(源、转换、目标)
- 使用
pipeline_orchestrator.py生成Airflow DAG - 定义数据质量验证规则
- 部署并配置监控/告警
预期输出: 生产级ETL管道,成功率99%+,自动化质量检查和全面监控
工作流2:构建实时流处理管道
时间: 3-5天
步骤:
- 根据需求选择流处理架构(Kappa vs Lambda)
- 配置流处理管道YAML(源、处理、汇、质量)
- 使用
kafka_config_generator.py生成Kafka配置 - 使用
stream_processor.py生成Flink/Spark作业脚手架 - 使用
streaming_quality_validator.py部署和监控
预期输出: 处理10K+事件/秒的流处理管道,P99延迟<1s,exactly-once传递和实时质量监控
面向生产级数据系统、可扩展管道和企业数据平台的世界级数据工程。
概述
本技能通过高级生产模式提供全面的数据工程基础专业知识。从设计Medallion架构到实施实时流处理管道,它涵盖现代数据工程的全部范围,包括ETL/ELT设计、数据质量框架、管道编排和DataOps实践。
本技能提供:
- 生产级管道模板(Airflow、Spark、dbt)
- 全面的数据质量验证框架
- 性能优化和成本分析工具
- 数据架构模式(Lambda、Kappa、Medallion)
- 完整的DataOps CI/CD工作流
最适合:
- 为企业系统构建可扩展数据管道
- 实施数据质量和治理框架
- 优化ETL性能和云成本
- 设计现代数据架构(数据湖、数据仓库、湖仓一体)
- 生产ML/AI数据基础设施
快速开始
管道编排
# 从配置生成Airflow DAG python scripts/pipeline_orchestrator.py --config pipeline_config.yaml --output dags/# 验证管道配置 python scripts/pipeline_orchestrator.py --config pipeline_config.yaml --validate
# 使用增量加载模板 python scripts/pipeline_orchestrator.py --template incremental --output dags/
数据质量验证
# 使用质量检查验证CSV文件 python scripts/data_quality_validator.py --input data/sales.csv --output report.html
# 使用自定义规则验证数据库表 python scripts/data_quality_validator.py \ --connection postgresql://user:pass@host/db \ --table sales_transactions \ --rules rules/sales_validation.yaml \ --threshold 0.95
性能优化
# 分析管道性能并获取建议 python scripts/etl_performance_optimizer.py \ --airflow-db postgresql://host/airflow \ --dag-id sales_etl_pipeline \ --days 30 \ --optimize
# 分析Spark作业性能 python scripts/etl_performance_optimizer.py \ --spark-history-server http://spark-history:18080 \ --app-id app-20250115-001
实时流处理
# 验证流处理管道配置 python scripts/stream_processor.py --config streaming_config.yaml --validate# 生成Kafka主题和客户端配置 python scripts/kafka_config_generator.py \ --topic user-events \ --partitions 12 \ --replication 3 \ --output kafka/topics/
# 生成exactly-once生产者配置 python scripts/kafka_config_generator.py \ --producer \ --profile exactly-once \ --output kafka/producer.properties
# 生成Flink作业脚手架 python scripts/stream_processor.py \ --config streaming_config.yaml \ --mode flink \ --generate \ --output flink-jobs/
# 监控流处理质量 python scripts/streaming_quality_validator.py \ --lag --consumer-group events-processor --threshold 10000 \ --freshness --topic processed-events --max-latency-ms 5000 \ --output streaming-health-report.html
核心工作流
1. 构建生产数据管道
步骤:
- 设计架构: 根据需求选择模式(Lambda、Kappa、Medallion)
- 配置管道: 创建包含源、转换、目标的YAML配置
- 生成DAG:
python scripts/pipeline_orchestrator.py --config config.yaml - 添加质量检查: 定义数据质量验证规则
- 部署和监控: 部署到Airflow,配置告警,跟踪指标
管道模式: 参见frameworks.md了解Lambda架构、Kappa架构、Medallion架构(Bronze/Silver/Gold)和微服务数据模式。
模板: 参见templates.md获取完整的Airflow DAG模板、Spark作业模板、dbt模型和Docker配置。
2. 数据质量管理
步骤:
- 定义规则: 创建涵盖完整性、准确性、一致性的验证规则
- 运行验证:
python scripts/data_quality_validator.py --rules rules.yaml - 审查结果: 分析质量分数和失败的检查
- 集成CI/CD: 将验证添加到管道部署流程
- 监控趋势: 随时间跟踪质量分数
质量框架: 参见frameworks.md获取涵盖所有维度(完整性、准确性、一致性、及时性、有效性)的完整数据质量框架。
验证模板: 参见templates.md获取验证配置示例和Python API用法。
3. 数据建模与转换
步骤:
- 选择建模方法: 维度建模(Kimball)、Data Vault 2.0或One Big Table
- 设计模式: 定义事实表、维度和关系
- 使用dbt实现: 创建暂存、中间和集市模型
- 处理SCD: 实现缓慢变化维度逻辑(Type 1/2/3)
- 测试和部署: 运行dbt测试,生成文档,部署
建模模式: 参见frameworks.md获取维度建模(Kimball)、Data Vault 2.0、One Big Table(OBT)和SCD实现。
dbt模板: 参见templates.md获取完整的dbt模型模板,包括暂存、中间、事实表和SCD Type 2逻辑。
4. 性能优化
步骤:
- 分析管道: 对最近的管道执行运行性能分析器
- 识别瓶颈: 审查执行时间分解和慢任务
- 应用优化: 实施建议(分区、索引、批处理)
- 调优Spark作业: 优化内存、并行度和shuffle设置
- 衡量影响: 比较前后的指标,跟踪成本节省
优化策略: 参见frameworks.md获取性能最佳实践,包括分区策略、查询优化和Spark调优。
分析工具: 参见tools.md获取关于etl_performance_optimizer.py的完整文档,包括查询分析和Spark调优。
5. 构建实时流处理管道
步骤:
- 架构选择: 选择Kappa(仅流处理)或Lambda(批处理+流处理)架构
- 配置管道: 创建包含源、处理引擎、汇、质量阈值的YAML配置
- 生成Kafka配置:
python scripts/kafka_config_generator.py --topic events --partitions 12 - 生成作业脚手架:
python scripts/stream_processor.py --mode flink --generate - 部署基础设施: 使用Docker Compose进行本地开发,Kubernetes用于生产
- 监控质量:
python scripts/streaming_quality_validator.py --lag --freshness --throughput
流处理模式: 参见frameworks.md获取状态处理、流连接、窗口化、exactly-once语义和CDC模式。
模板: 参见templates.md获取Flink DataStream作业、Kafka Streams应用程序、PyFlink模板和Docker Compose配置。
Python工具
pipeline_orchestrator.py
具有智能依赖解析和监控的自动化Airflow DAG生成。
主要功能:
- 从YAML配置生成生产级DAG
- 自动任务依赖解析
- 内置重试逻辑和错误处理
- 多源支持(PostgreSQL、S3、BigQuery、Snowflake)
- 集成质量检查和告警
用法:
# 基本DAG生成 python scripts/pipeline_orchestrator.py --config pipeline_config.yaml --output dags/# 带验证 python scripts/pipeline_orchestrator.py --config config.yaml --validate
# 从模板 python scripts/pipeline_orchestrator.py --template incremental --output dags/
完整文档: 参见tools.md获取完整的配置选项、模板和集成示例。
data_quality_validator.py
具有自动化检查和报告的全面数据质量验证框架。
功能:
- 多维度验证(完整性、准确性、一致性、及时性、有效性)
- Great Expectations集成
- 自定义业务规则验证
- HTML/PDF报告生成
- 异常检测
- 历史趋势跟踪
用法:
# 使用自定义规则验证 python scripts/data_quality_validator.py \ --input data/sales.csv \ --rules rules/sales_validation.yaml \ --output report.html
# 数据库表验证 python scripts/data_quality_validator.py \ --connection postgresql://host/db \ --table sales_transactions \ --threshold 0.95
完整文档: 参见tools.md获取规则配置、API用法和集成模式。
etl_performance_optimizer.py
具有可操作优化建议的管道性能分析。
功能:
- Airflow DAG执行分析
- 瓶颈检测和分析
- SQL查询优化建议
- Spark作业调优建议
- 成本分析和优化
- 历史性能趋势
用法:
# 分析Airflow DAG python scripts/etl_performance_optimizer.py \ --airflow-db postgresql://host/airflow \ --dag-id sales_etl_pipeline \ --days 30 \ --optimize
# Spark作业分析 python scripts/etl_performance_optimizer.py \ --spark-history-server http://spark-history:18080 \ --app-id app-20250115-001
完整文档: 参见tools.md获取分析选项、优化策略和成本分析。
stream_processor.py
用于Kafka、Flink和Kinesis的流处理管道配置生成器和验证器。
功能:
- 多平台支持(Kafka、Flink、Kinesis、Spark Streaming)
- 带有最佳实践检查的配置验证
- Flink/Spark作业脚手架生成
- Kafka主题配置生成
- 用于本地流处理栈的Docker Compose
- exactly-once语义配置
用法:
# 验证配置 python scripts/stream_processor.py --config streaming_config.yaml --validate# 生成Kafka配置 python scripts/stream_processor.py --config streaming_config.yaml --mode kafka --generate
# 生成Flink作业脚手架 python scripts/stream_processor.py --config streaming_config.yaml --mode flink --generate --output flink-jobs/
# 生成Docker Compose用于本地开发 python scripts/stream_processor.py --config streaming_config.yaml --mode docker --generate
完整文档: 参见tools.md获取配置格式、验证检查和生成的输出。
streaming_quality_validator.py
具有全面健康评分的实时流处理数据质量监控。
功能:
- 带阈值的消费者延迟监控
- 数据新鲜度验证(P50/P95/P99延迟)
- 模式漂移检测
- 吞吐量分析(事件/秒、字节/秒)
- 死信队列速率监控
- 带有建议的整体质量评分
- Prometheus指标导出
用法:
# 监控消费者延迟 python scripts/streaming_quality_validator.py \ --lag --consumer-group events-processor --threshold 10000# 监控数据新鲜度 python scripts/streaming_quality_validator.py \ --freshness --topic processed-events --max-latency-ms 5000
# 完整质量验证 python scripts/streaming_quality_validator.py \ --lag --freshness --throughput --dlq \ --output streaming-health-report.html
完整文档: 参见tools.md获取所有监控维度和集成模式。
kafka_config_generator.py
具有性能和安全性配置的生产级Kafka配置生成器。
功能:
- 主题配置(分区、复制、保留、压缩)
- 生产者配置(高吞吐量、exactly-once、低延迟、有序)
- 消费者配置(exactly-once、高吞吐量、批处理)
- 带状态存储调优的Kafka Streams配置
- 安全性配置(SASL-PLAIN、SASL-SCRAM、mTLS)
- Kafka Connect源/汇配置
- 多种输出格式(properties、YAML、JSON)
用法:
# 生成主题配置 python scripts/kafka_config_generator.py \ --topic user-events --partitions 12 --replication 3 --retention-hours 168# 生成exactly-once生产者 python scripts/kafka_config_generator.py \ --producer --profile exactly-once --transactional-id producer-001
# 生成Kafka Streams配置 python scripts/kafka_config_generator.py \ --streams --application-id events-processor --exactly-once
完整文档: 参见tools.md获取所有配置、安全性选项和Connect配置。
参考文档
框架 (frameworks.md)
全面的数据工程框架和模式:
- 架构模式: Lambda、Kappa、Medallion、微服务数据架构
- 数据建模: 维度建模(Kimball)、Data Vault 2.0、One Big Table
- ETL/ELT模式: 全量加载、增量加载、CDC、SCD、幂等管道
- 数据质量: 涵盖所有质量维度的完整框架
- DataOps: 数据管道的CI/CD、测试策略、监控
- 编排: Airflow DAG模式、回填策略
- 实时流处理: 状态处理、流连接、窗口化策略、exactly-once语义、事件时间处理、水印、反压、Apache Flink模式、AWS Kinesis模式、CDC用于流处理
- 治理: 数据目录、血缘追踪、访问控制
模板 (templates.md)
生产级代码模板和示例:
- Airflow DAG: 完整ETL DAG、增量加载、动态任务生成
- Spark作业: 批处理、流处理、优化配置
- dbt模型: 暂存、中间、事实表、带有SCD Type 2的维度
- SQL模式: 增量合并(upsert)、去重、日期骨架、窗口函数
- Python管道: 数据质量验证类、重试装饰器、错误处理
- 实时流处理: Apache Flink DataStream作业(Java)、Kafka Streams应用程序、PyFlink作业、AWS Kinesis消费者、用于流处理栈的Docker Compose
- Kafka配置: 生产者/消费者属性模板、主题配置、安全性配置
- Docker: 数据管道的Dockerfile、用于本地开发的Docker Compose,包括流处理栈(Kafka、Flink、Schema Registry)
- 配置: dbt项目配置、Spark配置、Airflow变量、流处理管道YAML
- 测试: pytest fixtures、集成测试、数据质量测试
工具 (tools.md)
Python自动化工具文档:
- pipeline_orchestrator.py: 完整使用指南、配置格式、DAG模板
- data_quality_validator.py: 验证规则、维度检查、Great Expectations集成
- etl_performance_optimizer.py: 性能分析、查询优化、Spark调优
- stream_processor.py: 流处理管道配置、验证、作业脚手架生成
- streaming_quality_validator.py: 消费者延迟、数据新鲜度、模式漂移、吞吐量监控
- kafka_config_generator.py: 主题、生产者、消费者、Kafka Streams和Connect配置
- 集成模式: Airflow、dbt、CI/CD、监控系统、Prometheus
- 最佳实践: 配置管理、错误处理、性能、监控、流处理质量
技术栈
核心技术:
- 语言: Python 3.8+、SQL、Scala(Spark)、Java(Flink)
- 编排: Apache Airflow、Prefect、Dagster
- 批处理: Apache Spark、dbt、Pandas
- 流处理: Apache Kafka、Apache Flink、Kafka Streams、Spark Structured Streaming、AWS Kinesis
- 存储: PostgreSQL、BigQuery、Snowflake、Redshift、S3、GCS
- 模式管理: Confluent Schema Registry、AWS Glue Schema Registry
- 容器化: Docker、Kubernetes
- 监控: Datadog、Prometheus、Grafana、Kafka UI
数据平台:
- 云数据仓库: Snowflake、BigQuery、Redshift
- 数据湖: Delta Lake、Apache Iceberg、Apache Hudi
- 流处理平台: Apache Kafka、AWS Kinesis、Google Pub/Sub、Azure Event Hubs
- 流处理引擎: Apache Flink、Kafka Streams、Spark Structured Streaming
- 工作流: Airflow、Prefect、Dagster
集成点
本技能与以下集成:
- 编排: Airflow、Prefect、Dagster用于工作流管理
- 转换: dbt用于SQL转换和测试
- 质量: Great Expectations用于数据验证
- 监控: Datadog、Prometheus用于管道监控
- BI工具: Looker、Tableau、Power BI用于分析
- ML平台: MLflow、Kubeflow用于ML管道集成
- 版本控制: Git用于管道代码和配置
参见tools.md获取详细的集成模式和示例。
最佳实践
管道设计:
- 幂等操作以确保安全重运行
- 尽可能使用增量处理
- 清晰的数据血缘和文档
- 全面的错误处理
- 自动化恢复机制
数据质量:
- 尽早定义质量规则
- 在每个管道阶段验证
- 自动化质量监控
- 随时间跟踪质量趋势
- 阻止坏数据进入下游
性能:
- 按日期/区域对大表进行分区
- 使用列式格式(Parquet、ORC)
- 利用谓词下推
- 针对查询模式进行优化
- 定期监控和调优
运维:
- 对所有内容进行版本控制
- 自动化测试和部署
- 实施全面监控
- 为事件记录运维手册
- 定期进行性能审查
性能目标
批处理管道执行:
- P50延迟:<5分钟(每小时管道)
- P95延迟:<15分钟
- 成功率:>99%
- 数据新鲜度:<1小时落后于源
流处理管道执行:
- 吞吐量:10K+事件/秒持续
- 端到端延迟:P99<1秒
- 消费者延迟:<10K条记录落后
- exactly-once传递:零重复或丢失
数据质量(批处理):
- 质量分数:>95%
- 完整性:>99%
- 及时性:<2小时数据延迟
- 零关键故障
流处理质量:
- 数据新鲜度:P95<5分钟从事件生成
- 延迟数据率:<5%在水印窗口外
- 死信队列速率:<1%
- 模式兼容性:100%向后/向前兼容变更
成本效率:
- 每GB处理成本:<$0.10
- 云成本趋势:稳定或下降
- 资源利用率:>70%
资源
- 框架指南: references/frameworks.md
- 代码模板: references/templates.md
- 工具文档: references/tools.md
- Python脚本:
scripts/目录
版本: 2.0.0
最后更新: 2025年12月16日
文档结构: 渐进式披露与全面参考
流处理增强: 任务#8 - 添加了实时流处理能力