Async sequential workflows

Posted on Apr 26, 2012 in and tagged ,

В последнее время всё чаще приходится сталкиваться с задачами, требующими выполнения последовательных асинхронных операций. Примером может служить загрузка файлов на мобильном устройстве. Учитывая что телефон может работать на слабом мобильном соединении, нам вряд ли удастся получить прирост производительности от параллельной загрузки. Да и стандартный класс WebClient, представленный в BCL не очень дружит с concurrent operations (точнее вообще не дружит). Создавать новый экземпляр на каждый запрос кажется идейно неправильным, так что попробуем обойтись одним веб-клиентом, обрабатывающим множество последовательных запросов.

Для начала – тривиальная синхронная реализация (которая конечно же не будет работать на WP7 из-за отсутствия синхронных методов у WebClient‘а):

public IEnumerable<string> Handle(IEnumerable<Uri> requests)
{
    var client = new WebClient();
 
    foreach (var request in requests)
    {
        yield return client.DownloadString(request);
    }
}

Разработчику, использующему асинхронную версии, скорее всего захочется узнать о моменте завершении загрузок. Не будем плохими мальчиками/девочками и отбросим мысли о EAP и ручном CPS. Довольно очевидным решением будет представить наши ожидающие загрузки файлы в виде потока событий при помощи observables. К сожалению, WebClient использует EAP для своих асинхронных операций, поэтому код становится чуть более неопрятным:

public IObservable<string> HandleAsync(IList<Uri> requests)
{
    var client = new WebClient();
    var subject = new Subject<string>();
    var enumerator = requests.GetEnumerator();
 
    
    Action takeNext = () =>
    {
        if (enumerator.MoveNext())
        {
            client.DownloadStringAsync(enumerator.Current);
        }
        else
        {
            subject.OnCompleted();
        }
    };
 
    client.DownloadStringCompleted += (s, e) =>
    {
        if (e.Error != null)
        {
            subject.OnError(e.Error);
        }
        else
        {
            subject.OnNext(e.Result);
            takeNext();
        }
    };
 
    takeNext();
 
    return subject;
}

В обоих случаях мы по сути описываем корутины, передающие управление в момент окончания обработки запроса. В синхронной реализации компилятор разворачивает yield в конечный автомат, генерирующий поток ответов, в асинхронной мы создаём его самостоятельно через замыкание на enumerator и итерацию внутри обработчика события DownloadStringCompleted. Те, кто уже использует async/await, могут переложить часть забот по описанию автомата на компилятор:

public async void HandleTaskAsync(IList<Uri> requests, Action<IObservable<string>> setObservable)
{
    var subject = new Subject();
    setObservable(subject);
 
    var client = new WebClient();
 
    foreach (var request in requests)
    {
        var response = await client.DownloadStringTaskAsync(request);
        subject.OnNext(response);
    }
}

Однако и в этом случае не обошлось без некоторых костылей – чтобы передать observable вызывающему методу, приходится использовать дополнительный параметр Action. Можно было бы воспользоваться интерфейсом IProgress, но для целостности будем рассматривать только IObservable.

Можно заметить, что на самом деле все эти 3 реализации очень похожи и отличаются лишь способом вызова континуаций (на следующей кликабельной картинке одинаковыми цветами отмечены аналогичные участки кода для каждого варианта).

Вернёмся к нашей асинхронной реализации с использованием observables. Хотелось бы всё-таки сделать её более обобщённой чтобы мы могли использовать использовать её подобно другим функциям высшего порядка из ObservableExtensions. Если ограничиться только EAP, то у любой задачи с последовательными асинхронными операциями есть:

  • функция run: T -> unit, запускающая операцию для очередного элемента входной последовательности
  • событие окончания выполнения операции completed: (object * TEventArgs) -> unit
  • параметрический тип TEventArgs, как правило содержащий информацию о результате выполнения операции или о произошедшей ошибке

Следовательно, можно написать следующую функцию:

public static IObservable<TResult> SelectFromAsyncEventPattern
    <TSource, TResult, TDelegate, TEventArgs>(
    this IEnumerable<TSource> source,
    Action<TSource> run,
    Func<EventHandler<TEventArgs>, TDelegate> conversion,
    Action<TDelegate> subscribe,
    Action<TDelegate> unsubscribe,
    Func<TEventArgs, TResult> mapResult,
    Func<TEventArgs, Exception> mapException)
 
    where TEventArgs: EventArgs
{
    var enumerator = source.GetEnumerator();
    var subject = new Subject<TResult>();
    var convertedHandler = default(TDelegate);
 
    Action takeNext = () =>
    {
        if (enumerator.MoveNext())
        {
            run(enumerator.Current);
        }
        else
        {
            subject.OnCompleted();
            unsubscribe(convertedHandler);
        }
    };
 
    EventHandler<TEventArgs> handler = (s, e) =>
    {
        var exception = mapException(e);
        if (exception != null)
        {
            subject.OnError(exception);
            unsubscribe(convertedHandler);
        }
        else
        {
            var result = mapResult(e);
            subject.OnNext(result);
            takeNext();
        }
    };
 
    convertedHandler = conversion(handler);
    subscribe(convertedHandler);
 
    takeNext();
    return subject;
}

На первый взгляд сигнатура может показаться не очень понятной и дружелюбной. В частности, пришлось ввести ещё один параметрический тип TDelegate и параметры conversion, subscribe и unsubscribe для обобщения события завершения операции (подобно методу Observable.FromEventPattern из Rx). В остальном код практически идентичен изначальной асинхронной реализации – подписываемся на событие, при каждом срабатывании сообщаем подписчикам о новом значении, аналогично уведомляем их при ошибке или после обработки всех элементов входной последовательности.

Пример использования:

public IObservable<string> HandleAsync(IList<Uri> requests)
{
    var client = new WebClient();
 
    return requests.SelectFromAsyncEventPattern
        <Uri, string, DownloadStringCompletedEventHandler,
            DownloadStringCompletedEventArgs>(
        client.DownloadStringAsync,
        handler => (s, e) => handler(s, e),
        x => client.DownloadStringCompleted += x,
        x => client.DownloadStringCompleted -= x,
        x => x.Result,
        x => x.Error);
}

К сожалению, компилятору не удаётся выполнить автоматический вывод типов, поэтому приходится указывать их вручную. Однако такой код всё равно выглядит лучше множественных вложенных континуаций.

Посмотреть и скачать примеры кода можно на Bitbucket.

Task Commands revisited

Posted on Feb 23, 2012 in and tagged , , , ,

Half a year ago I blogged about using TPL with MVVM in a test-friendly way. My solution turned out to be pretty handy, though it was far from being perfect. In this post I will describe what was wrong with the initial implementation and can we slightly improve it.

Looking at that solution right now, I wonder why did I decide to use event async pattern (providing the ExecuteCompleted event) in the first place. The intention was to notify the caller about completion of the asynchronous operation. That is what System.Threading.Task is used for this days, isn’t it? So the obvious solution is to expose the ExecuteAsync() method returning the Task instance on the command class. Here is the source of the updated AsyncRelayCommand:

public class AsyncRelayCommand<T> : ICommand {
    private readonly RelayCommand<T> _internalCommand;
    private readonly Func<T, Task> _executeMethod;
 
    public event EventHandler CanExecuteChanged
    {
        add { _internalCommand.CanExecuteChanged += value; }
        remove { _internalCommand.CanExecuteChanged -= value; }
    }
 
    public AsyncRelayCommand(Func<T, Task> executeMethod, Predicate<T> canExecuteMethod)
    {
        if (executeMethod == null)
        {
            throw new ArgumentNullException("executeMethod");
        }
 
        _executeMethod = executeMethod;
        _internalCommand = new RelayCommand<T>(_ => { }, canExecuteMethod);
    }
 
    public AsyncRelayCommand(Func<T, Task> executeMethod)
        : this(executeMethod, _ => true) { }
 
    void ICommand.Execute(object parameter)
    {
        if (parameter is T)
        {
            ExecuteAsync((T)parameter);
        }
 
        else throw new ArgumentException("Parameter should be of type " + typeof(T));
    }
 
    public Task ExecuteAsync(T parameter)
    {
        return _executeMethod(parameter);
    }
 
    public bool CanExecute(object parameter)
    {
        return _internalCommand.CanExecute((T)parameter);
    }
 
    public void RaiseCanExecuteChanged()
    {
        _internalCommand.RaiseCanExecuteChanged();
    }
}

The code is pretty straightforward, here are the key points:

  • Create an inner command that responds to CanExecute, RaiseCanExecute calls and handles subscriptions to CanExecuteChanged event (optional, since I am using MVVM Light with most of my projects, that’s an acceptable solution for me. However, you may easily implement your own commanding logic).
  • Introduce new ExecuteAsync method that returns Task instance.
  • Implement ICommand.Execute explicitly (I find it more consistent not to have Execute method in the AsyncRelayCommand class).

Side note: as I mentioned in the beginning, I find this command extension rather useful. It was successfully used in 5 own projects (both WP7 and WPF), and I hope you enjoy it too.

As usual, the source code is available at bitbucket (TaskCommands.v2.zip).