Category Archives: Rx

Rx – message bus

You can use Reactive Extensions to create message bus – a communication component to publish and subscribe to messages. See basic implementation.

Here I have simple Message class and MessageBus based on Rx ISubject. Bus has only two methods: one for sending the message and one for handling published message.

  class MessageBus
  {
    private readonly ISubject<Message> _subject;

    public MessageBus()
    {
      _subject = new Subject<Message>();
    }

    public void Publish(Message msg)
    {
      if (string.IsNullOrEmpty(msg.Recipient))
      {
        _subject.OnError(new InvalidOperationException("Message recipient is unknown."));
      }
      else
      {
        _subject.OnNext(msg);
      }
    }

    public void Subscribe(string recipient, Action<Message> action)
    {
      _subject
        .Where(m => m.Recipient == recipient)
        .Subscribe(action);
    }
  }
    private static void UseMessageBus()
    {
      //create message bus
      var b = new MessageBus();

      //subscribe to receive messages for two people
      //subscribing for messages to Bob only
      b.Subscribe("Bob", m => Console.WriteLine($"Bob, you got a message: {m.Content}"));
      //subscribing for messages to Mark only
      b.Subscribe("Mark", m => Console.WriteLine($"Message to you, Mark: {m.Content}"));
      
      //publish some messages
      b.Publish(new Message("Mark", "Hello, Mark!"));
      b.Publish(new Message("Bob", "Hello, Bob!"));
      b.Publish(new Message("John", "Hello, John!")); //we are not subscribed to, will not be handled
      b.Publish(new Message("Mark", "Do shopping, please!"));
      b.Publish(new Message("Mark", "Clean your room, please!"));
    }
Console output after running the code

Rx – Replacing .NET events

Reactive extensions (Rx, System.Reactive) can be described as LINQ to events.

How to replace typical .NET event handling with RX.

Imagine we have Director class. In this class we publish InvitationCreated event. We want to subscribe to this event.

Basic .NET events scenario.

  /// <summary>
  /// Director is publishing <see cref="InvitationCreated"/> events.
  /// </summary>
  class Director
  {
    public event EventHandler<InviteEventArgs> InvitationCreated;
    public string Name { get; private set; }

    public Director(string name)
    {
      Name = name;
    }

    public void MakeInvitation(string title)
    {
      Console.WriteLine($"I'm director {Name} and I'm inviting to: {title}");
      InvitationCreated?.Invoke(this, new InviteEventArgs(new Invitation() { MeetingTitle = title }));
    }
  }
    /// <summary>
    /// Here we are listening to published events
    /// </summary>
    private static void ObserveSimple()
    {
      Director director1 = new Director("John");
      director1.InvitationCreated += (s, e) => 
Console.WriteLine($"We noticed invitation: {e.Invitation.MeetingTitle}");
      director1.MakeInvitation("Business lunch");
      director1.MakeInvitation("Important week summary meeting");
    }
Console output after running this code

The same scenario, but using Rx:

  class DirectorRx
  {
    public event EventHandler<InviteEventArgs> InvitationCreated;
    public string Name { get; private set; }

    /// <summary>
    /// Wrapping InvitationCreated event with Rx
    /// Now you can subscribe the typical c# way and via rx
    /// </summary>
    public IObservable<Invitation> WhenDirectorInvited => Observable
      .FromEventPattern<InviteEventArgs>((h) => InvitationCreated += h, (h) => InvitationCreated -= h)
      .Select(x => x.EventArgs.Invitation);

    public DirectorRx(string name)
    {
      Name = name;
    }

    public void MakeInvitation(string title)
    {
      Console.WriteLine($"I'm director {Name} and I'm inviting to: {title}");
      InvitationCreated?.Invoke(this, new InviteEventArgs(new Invitation() { MeetingTitle = title }));
    }
  }
    /// <summary>
    /// Here we subscribe to events using Rx
    /// </summary>
    static void ObserveInvitationsViaRx()
    {
      var director1 = new DirectorRx("John");

      //we are subscribing to events, but filtering only important ones
      var subscriptionForImportantMeetings = director1.WhenDirectorInvited
        .Where(x => x.MeetingTitle.StartsWith("Important"))
        .Subscribe(i => Console.WriteLine($"We noticed important invitation: {i.MeetingTitle}"));

      //we are subscribing to events, but filtering only less important ones
      var subscriptionForOtherMeetings = director1.WhenDirectorInvited
        .Where(x => !x.MeetingTitle.StartsWith("Important"))
        .Subscribe(i => Console.WriteLine($"We noticed low priority invitation: {i.MeetingTitle}"));

      director1.MakeInvitation("Business lunch");
      director1.MakeInvitation("Important week summary meeting");

      subscriptionForImportantMeetings.Dispose();
      subscriptionForOtherMeetings.Dispose();

      //subscription disposed, we do not observe events anymore
      director1.MakeInvitation("Too late invitation");
    }
Console output after running this code