大数据调度系统 azkaban 学习备注

目前在使用 Spark 来处理数据,因为不同的任务之间具有依赖关系, corntab满足不了.所以决定使用 azkaban 的进行任务调度.

学习文档

azkaban 简单介绍与使用

azkaban 官网

azkaban调度 spark 任务

azkaban 最全的一个教程

  • 配置邮件
  • 配置时区

注意启动和停止 azkaban 都需要在 使用bin/start-solo.sh,而不能进到 bin 目录里面直接
执行./start-solo.sh.坑爹.这里耦合的比较严重,是一个坑.注意!!!

动态传参

写了一个 spark清洗脚本,需要清理具体日期的日志.这里需要动态的传入时间进去.

1
2
3
4
type=command
date=2018-06-23
command=spark-submit --master yarn /opt/appl/sparkLogAnalysis/elasticsearch-spark.py ${date}
notify.emails=xxx@xxx.com,xx@xxx.com # 成功失败后发送邮件

这里通过引用的方式传入时间参数,这样在 azkaban的 web 界面上就可以直接编辑参数

这里涉及到 azkaban 一个比较复杂的传参,通过 job 自身支持的参数替换来作为 shell脚本的入参,然后在 shell 脚板中通过$1,$2的方式来引用

1
2
3
4
5
6
7
8
type=command
host=http://host:port
index=logstash-2018*
doc_type=mbpgtw-interface
date=2018-06-26
limit=10000
command=bash /opt/appl/sparkLogAnalysis/load_data_from_es.sh ${host} ${index} ${doc_type} ${date} ${limit}
notify.emails=zhangjun@iboxpay.com,wuze@iboxpay.com

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/usr/bin/env bash

elasticdump \
--input=$1/$2/$3 \
--output=/opt/appl/es_data/mbpgtw-interface-$4.json \
--limit $5 \
--sourceOnly true \
--searchBody '{"query": {
"range": {
"@timestamp": {
"gte": "'$4' 00:00:00", #在字符串中要把$4用单引号在包一层来达到替换的目的
"lte": "'$4' 23:59:59",
"format":"yyyy-MM-dd HH:mm:ss",
"time_zone":"+08:00"
}
}
}
}'

这里如果需要动态的更改时间的话,是这样做的.时间(date)参数就不要在 job 里面定义.在 shell 脚本里面去弄.具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#!/usr/bin/env bash
yesterday=`date -d "1 days ago" +%Y-%m-%d`
#这里注意不是单引号,而是键盘波浪号下面的符号,应该不是中文符号, MarkDown 里面行间代码也是这个符号
elasticdump \
--input=$1/$2/$3 \
--output=/opt/appl/es_data/$3-$yesterday.json \
--limit $4 \
--sourceOnly true \
--searchBody '{"query": {
"range": {
"@timestamp": {
"gte": "'$yesterday' 00:00:00",
"lte": "'$yesterday' 23:59:59",
"format":"yyyy-MM-dd HH:mm:ss",
"time_zone":"+08:00"
}
}
}
}'

shell 传参可以参考这个

公共参数

可以定义一个system.properties文件,里面定义key-value.

这样多个 job 之间可以共享system.properties里面定义的字段.一样也是通过${xxx}的方式在 job 文件中引用.

一些注意事项

  • 一个flow的email属性,只会取最后一个job的配置,其他的job的email配置将会被忽略
    java.lang.UnsupportedClassVersionError: azkaban/soloserver/AzkabanSingleServer : Unsupported major.minor version 52.0 出现这种错误,是 Java 版本太低导致的,更新到java8 即可