Pilot Map Reduce — 分布式MapReduce
v1.0.0在代理群体上实现分布式MapReduce,用于并行数据处理。适用于需要跨多个工作节点处理大型数据集、并行执行map阶段后聚合reduce阶段、以及包含combine步骤的并行任务的场景。
详细分析 ▾
运行时依赖
版本
初始版本
安装命令 点击复制
技能文档
在代理群体上实现分布式map-reduce模式,用于并行数据处理。
命令
向worker提交map任务
TOTAL_WORKERS=$(pilotctl --json peers --search "role:mapper" | jq 'length')for i in $(seq 0 $((TOTAL_WORKERS - 1))); do
WORKER=$(pilotctl --json peers --search "role:mapper" | jq -r ".[$i].address")
pilotctl --json send-message "$WORKER" \
--data "{\"type\":\"map_task\",\"job_id\":\"$JOB_ID\",\"chunk_start\":$((i 1000)),\"chunk_end\":$(((i + 1) 1000))}"
done
收集map结果
EXPECTED_RESULTS=$TOTAL_WORKERS
RECEIVED=0while [ $RECEIVED -lt $EXPECTED_RESULTS ]; do
RESULTS=$(pilotctl --json received \
| jq '[.messages[] | select(.payload.type == "map_result" and .payload.job_id == "'$JOB_ID'")] | length')
RECEIVED=$RESULTS
sleep 1
done
Shuffle and reduce
MAP_RESULTS=$(cat /tmp/map-results-$JOB_ID.json)KEYS=$(echo "$MAP_RESULTS" | jq -r '.[].payload.results | to_entries | .[].key' | sort -u)
for key in $KEYS; do
VALUES=$(echo "$MAP_RESULTS" | jq -r '[.[].payload.results."'$key'" // empty] | flatten')
pilotctl --json send-message "$REDUCER" \
--data "{\"type\":\"reduce_task\",\"job_id\":\"$JOB_ID\",\"key\":\"$key\",\"values\":$VALUES}"
done
工作流示例
跨分布式文本语料库的词计数:
#!/bin/bashJOB_ID="wordcount-$(date +%s)"
# MAP phase
MAPPERS=$(pilotctl --json peers --search "role:mapper" | jq -r '.[].address')
for i in $(seq 0 9); do
pilotctl --json send-message "${MAPPERS[$i]}" \
--data "{\"type\":\"map_task\",\"job_id\":\"$JOB_ID\",\"chunk\":$i}" &
done
wait
# REDUCE phase
sleep 5
MAP_RESULTS=$(pilotctl --json received \
| jq '[.messages[] | select(.payload.type == "map_result")]')
FINAL=$(echo "$MAP_RESULTS" | jq 'map({(.payload.word): .payload.count}) | add')
echo "$FINAL"
依赖项
需要pilot-protocol技能、jq和sort。
免费技能或插件可能存在安全风险,如需更匹配、更安全的方案,建议联系付费定制