Orleans v.1.0.0 added support for streaming extensions to the programing model. Streaming extensions provide a set of abstractions and APIs that make thinking about and working with streams simpler and more robust. Streaming extensions allow developers to write reactive applications that operate on a sequence of events in a structured way. The extensibility model of stream providers makes the programming model compatible with and portable across a wide range of existing queuing technologies, such as Event Hubs, ServiceBus, Azure Queues, and Apache Kafka. There is no need to write special code or run dedicated processes to interact with such queues.
If you already know all about Stream Processing and are familiar with technologies like Event Hubs, Kafka, Azure Stream Analytics, Apache Storm, Apache Spark Streaming, and Reactive Extensions (Rx) in .NET, you may be asking why should you care. Why do we need yet another Stream Processing System and how Actors are related to Streams? “Why Orleans Streams?” is meant to answer that question.
There is a number of principles behind Orleans Streams Programming Model.
Applications interact with streams via APIs that are very similar to the well known Reactive Extensions (Rx) in .NET, by using Orleans.Streams.IAsyncStream<T> that implements
Orleans.Streams.IAsyncObserver<T> and
Orleans.Streams.IAsyncObservable<T> interfaces.
In a typical example below a device generates some data, which is sent as an HTTP request to the service running in the Cloud. Orleans client running in the front end server receives this HTTP call and publishes the data into a matching device stream:
public async Task OnHttpCall(DeviceEvent deviceEvent)
{
// Post data directly into device's stream.
IStreamProvider streamProvider = GrainClient.GetStreamProvider("myStreamProvider");
IAsyncStream<DeviceEventData> deviceStream = streamProvider.GetStream<DeviceEventData>(deviceEvent.DeviceId);
await deviceStream.OnNextAsync(deviceEvent.Data);
}
In another example below a chat user (implemented as Orleans Grain) joins a chat room, gets a handle to a stream of chat messages generated by all others users in this room and subscribes to it. Notice that the chat user neither does not need to know about the chat room grain itself (there might not be such a grain in our system) nor about other user in that group that produce messages. Needless to say, to produce to the chat stream, users don’t need to know who is currently subscribed to the stream. This demonstrates how chat users can be completely decoupled in time and space.
public class ChatUser: Grain
{
public async Task JoinChat(string chatGroupName)
{
IStreamProvider streamProvider = base.GetStreamProvider("myStreamProvider");
IAsyncStream<string> chatStream = streamProvider.GetStream<string>(chatGroupName);
await chatStream.SubscribeAsync((string chatEvent) => Console.Out.Write(chatEvent));
}
}
The Quick Start Sample is a good quick overview of the overall workflow of using streams in the application. After reading it you should read the Streams Programming APIs to get a deeper understanding of the concepts.
A Streams Programming APIs provides detailed description of the programming APIs.
Streams can come via physical channels of various shapes and forms and can have different semantics. Orleans Streaming is designed to support this diversity via the concept of Stream Providers, which is an extensibility point in the system. Orleans currently has implementation of two stream providers: TCP based Simple Message Stream Provider and Azure Queue based Azure Queue Stream Provider. More details on Steam Providers can be found at Stream Providers.
Stream Subsription Semantics:
Orleans Streams guarantee Sequential Consistency for Stream Subsription operations. Specificaly, when consumer subscribes to a stream, once the Task representing the subsription operation was successfuly resolved, the consumer will see all events that were generated after it has subscribed. In addition, Rewindable streams allow to subscribe from an arbitrary point in time in the past by using StreamSequenceToken (more details can be found here).
Individual Stream Events Delivery Guarantees: Individual event delivery guarantees depend on individual stream providers. Some provide only best-effort at-most-once delivery (such as Simple Message Streams), while others provide at-least-once delivery (such as Azure Queue Streams). It is even possible to build a stream provider that will guarantee exactly-once delivery (we don’t have such a provider yet, but it is possible to build one with the extensability model).
Events Delivery Order:
Event order also depends on a particular stream provider. In SMS streams, the producer explicitelly controls the order of events seen by the consumer by controlling the way it publishes them. Azure Queue streams do not guarantee FIFO order, since the underlaying Azure Queues do not guarantee order in failure cases. Applications can also control their own stream delivery ordering, by using StreamSequenceToken.
The Orleans Streams Implementation provides a high level overview of the internal implementation.
The Orleans Streams Extensibility describes how to extend streams with new functionality.
More examples of how to use streaming APIs within a grain can be found here. We plan to create more samples in the future.