Orleans Documentation

Client Observers

有些情况下简单的消息/响应模式是不够的,并且客户端需要收到异步通知。 例如,一个用户可能想要在他的朋友发布新即时消息的时候收到通知。

客户端观察者是一个允许异步通知客户端的机制。 一个观察者是一个继承自IGrainObserver的单向的异步接口,并且它的所有方法都是void的。 grain通过像调用grain接口的方法一样调用观察者的方法来发送一个通知给观察者,不同是观察者方法没有返回值并且grain不会依赖调用结果。 Orleans运行时将会确保单向传递通知的成功。 发布通知的grain应该提供API来添加或者删除观察者。 另外,为了方便通常会暴露一个取消已有订阅的方法。 grain的开发者可以使用Orleans的ObserverSubscriptionManager<T>泛型类来简化被观察的grain类型的开发。

为了订阅一个通知,客户端必须首先创建一个本地的实现了观察者接口的C#对象。 然后调用然后调用观察者工厂的CreateObjectReference()方法来把C#对象转换成一个grain引用,这样就可以传递给通知grain的订阅方法了。

这个模型也可以用作其他grain接口异步通知。 不像客户端订阅的情况,订阅的grain只要实现观察者接口,并且把自身作为一个引用传递给自己(例如this.AsReference)。

代码例子

让我们假设有一个grain周期性地给客户端发送消息。为了简单期间,我们例子中的消息将会是一个字符串。我们首先在客户端定义接收这个消息的接口。

这个接口将会是这样

public interface IChat : IGrainObserver
{
    void ReceiveMessage(string message);
}

唯一特殊的一点是这个接口要继承自IGrainObserver。现在任何想要接受这些消息的客户端应该实现一个实现了IChat接口的类。

最简单的例子是这样的:

public class Chat : IChat
{
    public void ReceiveMessage(string message)
    {
        Console.WriteLine(message);
    }
}

现在在服务器上我应该有一个发送聊天消息给客户端的grain。这个grain也应该有让客户端来订阅和退订他们自己的机制。订阅这个grain可以使用实用类ObserverSubscriptionManager:

class HelloGrain : Grain, IHello
{
    private ObserverSubscriptionManager<IChat> _subsManager;

    public override async Task OnActivateAsync()
    {
        // We created the utility at activation time.
        _subsManager = new ObserverSubscriptionManager<IChat>();
        await base.OnActivateAsync();
    }

    // Clients call this to subscribe.
    public async Task Subscribe(IChat observer)
    {
        _subsManager.Subscribe(observer);
    }

    //Also clients use this to unsubscribe themselves to no longer receive the messages.
    public async Task UnSubscribe(IChat observer)
    {
        _SubsManager.Unsubscribe(observer);
    }
}

给客户端发消息可以使用ObserverSubscriptionManager<IChat>实例Notify方法。这个方法接受一个Action<T>方法或者lambda表达式(这里的T是IChat的类型)

public Task SendUpdateMessage(string message)
{
    _SubsManager.Notify(s => s.ReceiveMessage(message));
    return TaskDone.Done;
}

现在我们的服务器已经有一个发送消息给观察者客户端的方法,有两个订阅/退订的方法和一个实现了一个可以观察grain消息的类的客户端。最后的异步是在客户端创建一个使用我们之前实现的Chat 类的观察者的引用并且让它在订阅后接受消息。

代码如下:

//首先创建grain的引用
var friend = GrainClient.GrainFactory.GetGrain<IHello>(0);
Chat c = new Chat();

//为chat创建一个用来订阅可观察的grain的引用。
var obj = await GrainClient.GrainFactory.CreateObjectReference<IChat>(c);
//订阅这个实例来接受消息。
await friend.Subscribe(obj);

现在无论何时我们服务器上的grain调用SendUpdateMessage方法,所有订阅了得客户端都将受到消息。在我们的客户端代码里,Chat的实例变量c会受到消息并且打印到控制台。

注意: 传递给CreateObjectReference的对象是通过通过一个WeakReference<T>实现并且因此如果没有其他的引用存在会被垃圾收集掉。用户应该为每一个不像被垃圾回收的观察者维护一个引用。

注意: 对观察者的支持可能在将来会被移除并且被简单消息流SMS取代,简单消息流能更强大灵活和可靠地支持同样的功能。

Next

下面我们看一下开发一个grain