Skip to main content

Introduction to Structured Streaming

 


Structured Streaming is one of the APIs provided by Spark to address stream processing needs. The API is based on the familiar Structured APIs, so the data are treated as tables with only one important distinction which is that it is a table to which data is continuously appended. As seen in the previous article, because of the base, which is the Structured API, we can perform SQL queries over our data. So the amount of technical capability required to construct operations on data is low. Much of the complexity that revolves around the stream processing is abstracted and we are left with configuring and expressing operations at a high level. But still there are concepts of Structured Streaming that one has to be familiar with before starting to work with it. In this article, I would like to provide a basic description of the landscape which will further be explored in the coming articles.


To ones who are familiar with Structured APIs and its concepts, there are not many new concepts when it comes to Structured Streaming. Much of the concepts are inherited from Structured APIs.To start with, the concepts of transformations and actions mean and apply almost the same to Streaming as well. Minor distinct being not all operations available on Structured APIs are available to Structured Streaming. There are some restrictions with respect to what transformations are possible on the streaming data and this is usually due to lack of efficient incremental execution plans for such operations. But this does not mean, there is not any effort to include support for such operations. As Spark evolves, support is being extended to transformations which were previously not possible on the streaming data. When it comes to actions, there is only one action available in Structured Streaming whose functionality is to indicate the start of stream processing. This will then run the computations defined iteratively.


For any type of processing, there must be a source and sink. So it is important to understand what are the sources and sinks that Structured Streaming supports. One might be slightly surprised, at least I was,  to know that filesystems(local or distributed or cloud) are one of the streaming sources supported. How this works is, the files that keep arriving are considered to be streaming events and computations are performed incrementally on the new data. Spark also supports  Apache Kafka as a source, which is an open source, distributed  event streaming platform. Another source supported is a socket, which is one endpoint of a two-way communication link between two programs/systems. We will not be limited to these sources as time passes, as the open source community and Spark continue to add support for other sources and develop a common Source API for streaming that will allow us to add sources of our own. There is symmetry in the importance of sources and sinks. Without sinks, all the processing that was done can become meaningless because the results will be persisted to be used later. The following sinks are inherently supported in Structured Streaming, such as Kafka, any distributed or local file-system. Additionally there is support for console, memory and a foreach sink. The console sink allows us to print some of the rows to the console(screen). Memory sink can be used to write the dataframe as an in-memory table, it is similar to the console sink but the data is not printed to the console. The data is collected in the driver and is made available as an in-memory table which can then be queried interactively. Finally, the for each sink can be used to write to sinks which do not have inherent support from Spark, but it can also be used to perform arbitrary parallel transformations on partitions.


Now that the concepts which are common across Structured API and Structured Streamin are discussed, the following sections will cover some of the crucial concepts that provide the distinction between the latter and the former. All these concepts will be further explored in the upcoming articles.


Output Modes


Since the results are going to be generated for every minibatch and there is a necessity to write those results, there arises a question about how the data is supposed to be written. The two write modes, overwrite and append, that are available for Structured API are in a way available for Structured Streaming as well. But those might not satisfy the streaming application needs. For example, let us say the streaming source sends out words as events(obviously with the time they were generated). We wish to perform word count on the incoming data and write it to a sink. The words that are received will contain duplicates. So we have two choices, either write the count for the current micro-batch only or update the count for the same word if the word is already written. The second approach might seem most intuitive for many and there should be a way to implement that for those interested. So in structured streaming, there is another write/output mode called as update, which updates the pre-written results as new microbatches arrive. Append functions the same way as in Structured API. Instead of the overwrite mode, there is an output mode called complete, which overwrites the processed data across all micro batches each time a microbatch is processed. So overwrite and complete mode are similar. Like in Structured API, we cannot use any output mode that we want without any restrictions. There are restrictions on what output mode can be chosen based on the sink. Not all sinks support all output modes. There are restrictions based on the streaming query as well. So not all streaming query operations support all output modes. 


Windowing


Windowing is a process of logically grouping data based on intervals of time. The intervals of time are called windows. It is not necessary that we have to apply windows, but it is important to note that the majority of the applications use windows. Especially applications involving aggregations use windows to implicitly group data with respect to time. At the end of each window the data processed for that window is emitted as a default, which can be configured. We can group data based on both notions of time that Structured Streaming inherently supports. The window duration can be pre-configured to be fixed but we can also apply windows without a fixed interval. The way we configure this is by providing a windowing strategy. Each windowing strategy has it’s own characteristics which will be discussed in another article.


Triggers


Triggers determine when data is output and when the application should check for new input data and update its result. By default, Structured Streaming will look for new input records as soon as it has finished processing the last batch of input data, which is usually at the end of a window. By doing this there is minimum latency between the end of processing of a micro-batch to the start of processing of another micro-batch. Whenever the default might not be acceptable to a use case, we can make use of triggers to determine when results should be emitted and when the application should check for new data. Spark allows us to set triggers in terms of both event and processing time.


Event-time processing


Much of what is event-time processing has been discussed in the past article, so I will be keeping this section short. Expressing event-time processing using Structured Streaming is simple. Since the data is viewed as a table, we can perform operations using SQL. Based on the column which is the event-time field, Structured Streaming can take some special decisions including optimization and when the intermediate state can be forgotten(meaning all the data for a window has arrived; this can be configured). We have an option to let Spark take care of all of the above or take control of things on our own.


Watermarks


This is a feature of streaming systems that allow us to specify how late we expect to see data in event time and when the state that is stored can be dropped. Watermark is used to control when to close the current window and output a result for a particular event time window. 



Conclusion;


I hope this article was a good place to start. If you have any suggestions or questions please post it in the comment box. This article, very much like every article in this blog, will be updated based on comments and as I find better ways of explaining things. So kindly bookmark this page and checkout whenever you need some reference. 


Happy Learning! 😀


Comments

Popular posts from this blog

Spark Streaming APIs

  Streaming applications and some common designs of such applications had been discussed in the last two articles: article1 and article2. Spark, which is a powerful processing engine for batch data, also provides support for streaming data. The support comes as two related streaming APIs.  The earliest stream processing API in Spark is called DStream API and it takes the micro-batch processing approach over the record-by-record processing. The second design pattern that the DStream API follows is that it takes the declarative programming approach rather than the imperative approach. Finally the Dstream API only supports processing-time processing. For event-time use-cases, we have to implement our own version depending upon the use-case. One important distinction of DStream API is that it works on Java or Python objects instead of the familiar table-like abstractions such as Dataframe or Dataset. Since the objects are tied to their language, the amount of optimizations that Sp...

Joins in Spark

  Joins are a crucial transformation when it comes to data. It allows us to enrich the data that we currently have with some more data. As the name suggests, the join() method allows us to join two dataframes. We are allowed to define what strategy is to be used and on what conditions the dataframes are to be joined. I had given a brief introduction in this article , if you have not checked it out I kindly request you to do so. The join() method takes three arguments: other on how The other should be an object of Class DataFrame , argument on should be either a condition or a list of column(s) or column name(s). Argument how determines what join strategy is to be used.  The how argument accepts a variety of string values that should be one of the following: Allowed Values for how Diagram inner cross outer,full,fullouter,full_outer left,leftouter,left_outer right,rightouter,right_outer semi,left_semi,leftsemi Note: Same as inner join, but the final dataframe has only colu...