Flink——DataStream API
主要内容:
- DataSource
- DataStream Transformations
- DataSink
关于Flink程序的开发流程和具体案例请参考:Flink——从零搭建Flink应用。
DataSource
Datasource用于Flink程序读取数据,可通过:StreamExecutionEnvironment.进行配置。
内置数据源
- 文件数据源:
readTextFile(path):直接读取文本文件;readFile(fileInputFormat, path):读取指定类型的文件;readFile(fileInputFormat, path, watchType, interval, pathFilter):可指定读取文件的类型、检测文件变换的时间间隔、文件路径过滤条件等。watchType分为两种模式:PROCESS_CONTINUOUSLY:一旦检测到文件变化,会将改文件全部内容加载到Flink。该模式无法实现Excatly Once;PROCESS_ONCE:一旦检测到文件变化,只会将变化的数据加载到Flink。该模式无法实现Excatly Once。
- socket数据源:
socketTextStream(hostname, port):从Socket端口传入数据;
- 集合数据源:
fromCollection(Seq)fromCollection(Iterator)fromElements(elements: _*)fromParallelCollection(SplittableIterator)generateSequence(from, to)
外部数据源
对于流式计算类型的应用,数据大部分都是从外部第三方系统中获取,为此,Flink通过实现SourceFunction定义了丰富的第三方数据连接器(支持自定义数据源):
DataStream Transformations
Operator |
Transformation |
Example |
|---|---|---|
| map | DataStream → DataStream | dataStream.map { x => x * 2 } |
| flatMap | DataStream → DataStream | dataStream.flatMap { str => str.split(" ") } |
| filter | DataStream → DataStream | dataStream.filter { _ != 0 } |
| keyBy | DataStream → KeyedStream | dataStream.keyBy("someKey") // Key by field "someKey" dataStream.keyBy(0) // Key by the first element of a Tuple |
| reduce | KeyedStream → DataStream | keyedStream.reduce { _ + _ } |
| fold | KeyedStream → DataStream | val result: DataStream[String] = keyedStream.fold("start")((str, i) => { str + "-" + i }) |
| aggregations | KeyedStream → DataStream | keyedStream.sum(0) keyedStream.sum("key") keyedStream.min(0) keyedStream.min("key") keyedStream.max(0) keyedStream.max("key") keyedStream.minBy(0) keyedStream.minBy("key") keyedStream.maxBy(0) keyedStream.maxBy("key") |
| window | KeyedStream → WindowedStream | dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data |
| windowAll | DataStream → AllWindowedStream | dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data |
| Window Apply | WindowedStream → DataStream AllWindowedStream → DataStream | windowedStream.apply { WindowFunction } // applying an AllWindowFunction on non-keyed window stream allWindowedStream.apply { AllWindowFunction } |
| Window Reduce | WindowedStream → DataStream | windowedStream.reduce { _ + _ } |
| Window Fold | WindowedStream → DataStream | val result: DataStream[String] = windowedStream.fold("start", (str, i) => { str + "-" + i }) |
| Aggregations on windows | WindowedStream → DataStream | windowedStream.sum(0) windowedStream.sum("key") windowedStream.min(0) windowedStream.min("key") windowedStream.max(0) windowedStream.max("key") windowedStream.minBy(0) windowedStream.minBy("key") windowedStream.maxBy(0) windowedStream.maxBy("key") |
| union | DataStream* → DataStream | dataStream.union(otherStream1, otherStream2, ...) |
| Window Join | DataStream,DataStream → DataStream | dataStream.join(otherStream) .where( .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply { ... } |
| Window CoGroup | DataStream,DataStream → DataStream | dataStream.coGroup(otherStream) .where(0).equalTo(1) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply {} |
| connect | DataStream,DataStream → ConnectedStreams | someStream : DataStream[Int] = ... otherStream : DataStream[String] = ... val connectedStreams = someStream.connect(otherStream) |
| CoMap, CoFlatMap | ConnectedStreams → DataStream | connectedStreams.map( (_ : Int) => true, (_ : String) => false ) connectedStreams.flatMap( (_ : Int) => true, (_ : String) => false ) |
| split | DataStream → SplitStream | val split = someDataStream.split( (num: Int) => (num % 2) match { case 0 => List("even") case 1 => List("odd") } ) |
| select | SplitStream → DataStream | val even = split select "even" val odd = split select "odd" val all = split.select("even","odd") |
| iterate | DataStream → IterativeStream → DataStream | initialStream.iterate { iteration => { val iterationBody = iteration.map {/do something/} (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0)) } } |
| Extract Timestamps | DataStream → DataStream | stream.assignTimestamps { timestampExtractor } |
DataSink
经过各种数据转换操作之后,形成最终结果数据集。通常情况下,需要将结果输出在外部存储介质或者传输到下游的消息中间件内,在Flink中将DataStream数据输出到外部系统的过程被定义为DataSink操作。可通过:StreamExecutionEnvironment.进行配置。
内置数据源
writeAsText()/TextOutputFormatwriteAsCsv(...)/CsvOutputFormatprint()/printToErr()writeUsingOutputFormat()/FileOutputFormatwriteToSocket