窗口
- 时间语义,要配合窗口操作才能发挥作用
- 在Table API 和 SQL 中,主要有两种窗口
Group Window(分组窗口)
- 根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数
Over Windows
- 针对每个输入行,计算相邻范围内的聚合
Group Windows
-
Group Windows是使用window(w:GroupWindows)子句定义的,并且必须由as子句指定一个别名
-
为了按窗口对表进行分组,窗口的别名必须在group by子句中,像常规的分组字段一样引用
Table table = input .window(w:GroupWindow] as "w") //定义窗口,别名为"w" .groupBy("w","a") // 按照字段a和窗口w分组 .select("a,b.sum"); // 聚合
-
Table API 提供了一组具有特定语义的预定义Window类,这些类会被转换为底层DataStream或DataSet的窗口操作
滚动窗口(Tumbling windows)
-
滚动窗口要用Tunble类来定义
//Tumbling Event-time Window .window(Tumble.over("10.minutes").on("rowtime").as("w")) // Tumbling Processing-tiem Window .window(Tumble.over("10.minutes").on("proctime").as("w")) // Tumbling Row-count Window .window(Tumble.over("10.rows").on("proctime").as("w"))
滑动窗口(Sliding windows)
-
滑动窗口要用Slide类来定义
//Sliding Event-time Window .window(Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w")) // Sliding Processing-tiem Window .window(Slide.over("10.minutes").every("5.minutes").on("proctime").as("w")) // Sliding Row-count Window .window(Slide.over("10.rows").every("5.rows").on("proctime").as("w"))
会话窗口 (Session Window)
-
会话窗口要用Session类来定义
// Session Event-time window .window(Session.withGap("10.minutes").on("rowtime").as("w")) // Session Processing-time window .window(Session.withGap("10.minutes").on("proctime").as("w"))
SQL中的Group Windows
-
Group Windows定义在SQL查询的Group By子句中
TUMBLE(time_attr,interval)
- 定义一个滚动窗口,第一个参数是时间字段,第二个参数是窗口字段
HOP(time_attr,interval,interval)
- 定义一个滑动窗口,第一个参数是时间字段,第二个参数是窗口滑动步长,第三是窗口长度
SESSION(time_attr,interval)
- 定义一个会话窗口,第一个参数是时间字段,第二个参数是窗口间隔。
-
案例代码
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream<String> fileDataStream = env.readTextFile("data/temps.txt"); DataStream<TempInfo> dataStream = fileDataStream.map(new MapFunction<String, TempInfo>() { @Override public TempInfo map(String value) throws Exception { String[] split = value.split(","); return new TempInfo(split[0], new Long(split[1]), new Double(split[2])); } }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<TempInfo>(Time.seconds(1)) { @Override public long extractTimestamp(TempInfo element) { return element.getTimeStamp() * 1000L; } }); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //Table dataTable = tableEnv.fromDataStream(dataStream, "id, timeStamp.rowtime as ts , temp, pt.proctime"); //Table dataTable = tableEnv.fromDataStream(dataStream, "id, timeStamp.rowtime as ts , temp, rt.rowtime"); Table dataTable = tableEnv.fromDataStream(dataStream,"id, timeStamp as ts, temp, rt.rowtime"); // 构建视图 tableEnv.createTemporaryView("sensor",dataTable); // 窗口操作 // 1. Group Window // table API Table resTable = dataTable.window(Tumble.over("10.seconds").on("rt").as("tw")) .groupBy("id,tw") .select("id, id.count, temp.avg"); // SQL Table resSqlTable = tableEnv.sqlQuery("SELECT id, count(id) as cnt, avg(temp) as avgTemp, tumble_end(rt,interval '10' second)" + "from sensor group by id, tumble(rt,interval '10' second) "); // 2. Over Window // table API Table overTable = dataTable.window(Over.partitionBy("id").orderBy("rt").preceding("2.rows").as("ow")) .select("id, rt, id.count over ow,temp.avg over ow"); // SQL tableEnv.sqlQuery("select id, rt, count(id) over ow, avg(temp) over ow "+ " from sensor "+ " window ow as (partition by id order by rt rows between 2 preceding and current row)"); tableEnv.toAppendStream(resTable, Row.class).print("group window"); tableEnv.toAppendStream(resSqlTable, Row.class).print("resSqlTable"); tableEnv.toRetractStream(overTable,Row.class).print("over window"); env.execute(); }
Over Windows
-
Over window聚合是标准SQL中已有的(over子句),可以在查询的SELECT 子句中定义
-
Over window聚合,会针对每个输入行,计算相邻行范围内的聚合
-
Over windows使用window(w:overwindows*)子句定义,并在select()方法中通过别名来引用
Table table = input .window([w:OverWindow] as "w") .select("a, b.sum over w, c.min over w");
-
Table API 提供了Over类,来配置Over窗口的属性
无界 Over Windows
- 可以在事件时间或处理时间,以及指定为时间间隔,或行计数的范围内,定义Over windows
- 无界的over windows 是使用常量指定的
// 无界的事件时间 over window
.window(Over.partitionBy("a").orderBy("rowtime").preceding(UNBOUNDED_RANGE).as("w"))
// 无界的处理时间 over window
.window(Over.partitionBy("a").orderBy("proctime").preceding(UNBOUNDED_RANGE).as("w"))
// 无界的事件时间 Row-count over window
.window(Over.partitionBy("a").orderBy("rowtime").preceding(UNBOUNDED_ROW).as("w"))
// 无界的处理时间 Row-count over window
.window(Over.partitionBy("a").orderBy("proctime").preceding(UNBOUNDED_ROW).as("w"))
有界 Over Windows
- 有界的over window 是用间隔的大小指定的
// 有界的事件时间over window
.window(Over.partitionBy("a").orderBy("rowtime").preceding("1.minutes").as("w"))
// 有界的处理时间 over window
.window(Over.partitionBy("a").orderBy("proctime").preceding("1.minutes").as("w"))
// 有界的事件时间 Row-count over window
.window(Over.partitionBy("a").orderBy("rowtime").preceding("10.rows").as("w"))
// 有届的处理时间 Row-count over window
.window(Over.partitionBy("a").orderBy("proctime").preceding("10.rows").as("w"))
SQL中的Over Windows
-
用Over做窗口聚合时,所有聚合必须在同一个窗口上定义,也就是说必须是相同的分区,排序和范围
-
目前仅支持在当前行范围之前的窗口
-
ORDER BY 必须在单一的时间属性上指定
SELECT COUNT(amount) OVER( PARTITION BY user ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM Orders