Async sequential workflows

Posted on Apr 26, 2012 in and tagged ,

В последнее время всё чаще приходится сталкиваться с задачами, требующими выполнения последовательных асинхронных операций. Примером может служить загрузка файлов на мобильном устройстве. Учитывая что телефон может работать на слабом мобильном соединении, нам вряд ли удастся получить прирост производительности от параллельной загрузки. Да и стандартный класс WebClient, представленный в BCL не очень дружит с concurrent operations (точнее вообще не дружит). Создавать новый экземпляр на каждый запрос кажется идейно неправильным, так что попробуем обойтись одним веб-клиентом, обрабатывающим множество последовательных запросов.

Для начала – тривиальная синхронная реализация (которая конечно же не будет работать на WP7 из-за отсутствия синхронных методов у WebClient‘а):

public IEnumerable<string> Handle(IEnumerable<Uri> requests)
{
    var client = new WebClient();
 
    foreach (var request in requests)
    {
        yield return client.DownloadString(request);
    }
}

Разработчику, использующему асинхронную версии, скорее всего захочется узнать о моменте завершении загрузок. Не будем плохими мальчиками/девочками и отбросим мысли о EAP и ручном CPS. Довольно очевидным решением будет представить наши ожидающие загрузки файлы в виде потока событий при помощи observables. К сожалению, WebClient использует EAP для своих асинхронных операций, поэтому код становится чуть более неопрятным:

public IObservable<string> HandleAsync(IList<Uri> requests)
{
    var client = new WebClient();
    var subject = new Subject<string>();
    var enumerator = requests.GetEnumerator();
 
    
    Action takeNext = () =>
    {
        if (enumerator.MoveNext())
        {
            client.DownloadStringAsync(enumerator.Current);
        }
        else
        {
            subject.OnCompleted();
        }
    };
 
    client.DownloadStringCompleted += (s, e) =>
    {
        if (e.Error != null)
        {
            subject.OnError(e.Error);
        }
        else
        {
            subject.OnNext(e.Result);
            takeNext();
        }
    };
 
    takeNext();
 
    return subject;
}

В обоих случаях мы по сути описываем корутины, передающие управление в момент окончания обработки запроса. В синхронной реализации компилятор разворачивает yield в конечный автомат, генерирующий поток ответов, в асинхронной мы создаём его самостоятельно через замыкание на enumerator и итерацию внутри обработчика события DownloadStringCompleted. Те, кто уже использует async/await, могут переложить часть забот по описанию автомата на компилятор:

public async void HandleTaskAsync(IList<Uri> requests, Action<IObservable<string>> setObservable)
{
    var subject = new Subject();
    setObservable(subject);
 
    var client = new WebClient();
 
    foreach (var request in requests)
    {
        var response = await client.DownloadStringTaskAsync(request);
        subject.OnNext(response);
    }
}

Однако и в этом случае не обошлось без некоторых костылей – чтобы передать observable вызывающему методу, приходится использовать дополнительный параметр Action. Можно было бы воспользоваться интерфейсом IProgress, но для целостности будем рассматривать только IObservable.

Можно заметить, что на самом деле все эти 3 реализации очень похожи и отличаются лишь способом вызова континуаций (на следующей кликабельной картинке одинаковыми цветами отмечены аналогичные участки кода для каждого варианта).

Вернёмся к нашей асинхронной реализации с использованием observables. Хотелось бы всё-таки сделать её более обобщённой чтобы мы могли использовать использовать её подобно другим функциям высшего порядка из ObservableExtensions. Если ограничиться только EAP, то у любой задачи с последовательными асинхронными операциями есть:

  • функция run: T -> unit, запускающая операцию для очередного элемента входной последовательности
  • событие окончания выполнения операции completed: (object * TEventArgs) -> unit
  • параметрический тип TEventArgs, как правило содержащий информацию о результате выполнения операции или о произошедшей ошибке

Следовательно, можно написать следующую функцию:

public static IObservable<TResult> SelectFromAsyncEventPattern
    <TSource, TResult, TDelegate, TEventArgs>(
    this IEnumerable<TSource> source,
    Action<TSource> run,
    Func<EventHandler<TEventArgs>, TDelegate> conversion,
    Action<TDelegate> subscribe,
    Action<TDelegate> unsubscribe,
    Func<TEventArgs, TResult> mapResult,
    Func<TEventArgs, Exception> mapException)
 
    where TEventArgs: EventArgs
{
    var enumerator = source.GetEnumerator();
    var subject = new Subject<TResult>();
    var convertedHandler = default(TDelegate);
 
    Action takeNext = () =>
    {
        if (enumerator.MoveNext())
        {
            run(enumerator.Current);
        }
        else
        {
            subject.OnCompleted();
            unsubscribe(convertedHandler);
        }
    };
 
    EventHandler<TEventArgs> handler = (s, e) =>
    {
        var exception = mapException(e);
        if (exception != null)
        {
            subject.OnError(exception);
            unsubscribe(convertedHandler);
        }
        else
        {
            var result = mapResult(e);
            subject.OnNext(result);
            takeNext();
        }
    };
 
    convertedHandler = conversion(handler);
    subscribe(convertedHandler);
 
    takeNext();
    return subject;
}

На первый взгляд сигнатура может показаться не очень понятной и дружелюбной. В частности, пришлось ввести ещё один параметрический тип TDelegate и параметры conversion, subscribe и unsubscribe для обобщения события завершения операции (подобно методу Observable.FromEventPattern из Rx). В остальном код практически идентичен изначальной асинхронной реализации – подписываемся на событие, при каждом срабатывании сообщаем подписчикам о новом значении, аналогично уведомляем их при ошибке или после обработки всех элементов входной последовательности.

Пример использования:

public IObservable<string> HandleAsync(IList<Uri> requests)
{
    var client = new WebClient();
 
    return requests.SelectFromAsyncEventPattern
        <Uri, string, DownloadStringCompletedEventHandler,
            DownloadStringCompletedEventArgs>(
        client.DownloadStringAsync,
        handler => (s, e) => handler(s, e),
        x => client.DownloadStringCompleted += x,
        x => client.DownloadStringCompleted -= x,
        x => x.Result,
        x => x.Error);
}

К сожалению, компилятору не удаётся выполнить автоматический вывод типов, поэтому приходится указывать их вручную. Однако такой код всё равно выглядит лучше множественных вложенных континуаций.

Посмотреть и скачать примеры кода можно на Bitbucket.

Building cool animated tiles with Rx

Posted on Nov 15, 2011 in and tagged ,

I have a small WP7 project called Summarize. It is a fun math-based game (check it out for free).

It has a nice effect of loading tiles. Several people has asked me how I have done it, so in this post I will cover how to build something similar on your own.

Let’s start with building the UI. We have the toolkit’s WrapPanel inside the ItemsControl and a simple TileControl with TextBlock displaying the tile number.

TileControl.xaml markup:

<UserControl 
    x:Name="UserControl"
    x:Class="AnimatedTilesSample.TileControl"
    xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
    xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
    FontFamily="{StaticResource PhoneFontFamilyNormal}"
    FontSize="{StaticResource PhoneFontSizeNormal}"
    Foreground="{StaticResource PhoneForegroundBrush}"
    Loaded="OnLoaded"
    Width="76" Height="76">
    
    <UserControl.Projection>
        <PlaneProjection RotationY="-180"/>
    </UserControl.Projection>
    
    <UserControl.Resources>
        <Storyboard x:Key="LoadedStoryboard">
            <DoubleAnimation 
                Storyboard.TargetProperty="(UIElement.Projection).(PlaneProjection.RotationY)"
                Storyboard.TargetName="UserControl"
                To="0"
                Duration="0:0:0.2"/>
            <ObjectAnimationUsingKeyFrames
                Storyboard.TargetProperty="UIElement.Visibility"
                Storyboard.TargetName="Text">
                <DiscreteObjectKeyFrame 
                    KeyTime="0:0:0.1">
                    <DiscreteObjectKeyFrame.Value>
                        <Visibility>Visible</Visibility>
                    </DiscreteObjectKeyFrame.Value>
                </DiscreteObjectKeyFrame>
            </ObjectAnimationUsingKeyFrames>
        </Storyboard>
    </UserControl.Resources>
    
    <Grid 
        x:Name="LayoutRoot"
        Background="{StaticResource PhoneChromeBrush}">
        <TextBlock 
            x:Name="Text"
            HorizontalAlignment="Center"
            VerticalAlignment="Center"
            Visibility="Collapsed"
            Style="{StaticResource PhoneTextTitle2Style}"
            Text="{Binding ., Mode=OneTime}"/>
    </Grid>
</UserControl>

In the code-behind we start the animation when the tile is loaded:

private void OnLoaded(object sender, RoutedEventArgs e)
{
    var sb = Resources["LoadedStoryboard"] as Storyboard;
    if (sb != null)
    {
        sb.Begin();
    }
}

MainPage.xaml markup:

<phone:PhoneApplicationPage 
    x:Class="AnimatedTilesSample.MainPage"
    xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
    xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
    xmlns:phone="clr-namespace:Microsoft.Phone.Controls;assembly=Microsoft.Phone"
    xmlns:toolkit="clr-namespace:Microsoft.Phone.Controls;assembly=Microsoft.Phone.Controls.Toolkit"
    xmlns:local="clr-namespace:AnimatedTilesSample"
    FontFamily="{StaticResource PhoneFontFamilyNormal}"
    FontSize="{StaticResource PhoneFontSizeNormal}"
    Foreground="{StaticResource PhoneForegroundBrush}"
    SupportedOrientations="Portrait" Orientation="Portrait" >
     <Grid x:Name="LayoutRoot" Background="Transparent">
        <Grid x:Name="ContentPanel" Margin="12,0,12,0">
            <ItemsControl
                x:Name="Cells">
                <ItemsControl.ItemsPanel>
                    <ItemsPanelTemplate>
                        <toolkit:WrapPanel
                            Orientation="Horizontal"/>
                    </ItemsPanelTemplate>
                </ItemsControl.ItemsPanel>
                <ItemsControl.ItemTemplate>
                    <DataTemplate>
                        <local:TileControl Margin="2"/>
                    </DataTemplate>
                </ItemsControl.ItemTemplate>
            </ItemsControl>
            
            <Button
                HorizontalAlignment="Left"
                VerticalAlignment="Bottom"
                x:Name="PlayAnimationButton"
                Content="Play animation"
                Click="PlayAnimationHandler"/>
        </Grid>
    </Grid>
 
</phone:PhoneApplicationPage>

Clicking the button generates the observable sequence producing values every 80 milliseconds. We use only the first 25 values and when the every next value is available, the appropriate tile is created and added to the list. Adding the tile starts its ‘loaded’ animation.

private void PlayAnimationHandler(object sender, RoutedEventArgs e)
{
    Cells.Items.Clear();
    PlayAnimationButton.IsEnabled = false;


    Observable.Interval(TimeSpan.FromMilliseconds(80))
        .Take(25)
        .ObserveOnDispatcher()
        .Finally(() =>
        {
            PlayAnimationButton.IsEnabled = true;
        })
        .Subscribe(x => Cells.Items.Add(x));
}

Because changes to ObservableCollection should occur only on the thread that it was created on, I used DispatcherObservable.ObserveOnDispatcher extension method that schedules all observer actions on the runtime dispatcher.

You can download sample app with sources here.