Async sequential workflows

В последнее время всё чаще приходится сталкиваться с задачами, требующими выполнения последовательных асинхронных операций. Примером может служить загрузка файлов на мобильном устройстве. Учитывая что телефон может работать на слабом мобильном соединении, нам вряд ли удастся получить прирост производительности от параллельной загрузки. Да и стандартный класс WebClient, представленный в BCL не очень дружит с concurrent operations (точнее вообще не дружит). Создавать новый экземпляр на каждый запрос кажется идейно неправильным, так что попробуем обойтись одним веб-клиентом, обрабатывающим множество последовательных запросов.

Для начала – тривиальная синхронная реализация (которая конечно же не будет работать на WP7 из-за отсутствия синхронных методов у WebClient‘а):

public IEnumerable<string> Handle(IEnumerable<Uri> requests) {
  var client = new WebClient();
  foreach (var request in requests) {
    yield return client.DownloadString(request);
  }
}

Разработчику, использующему асинхронную версии, скорее всего захочется узнать о моменте завершении загрузок. Не будем плохими мальчиками/девочками и отбросим мысли о EAP и ручном CPS. Довольно очевидным решением будет представить наши ожидающие загрузки файлы в виде потока событий при помощи observables. К сожалению, WebClient использует EAP для своих асинхронных операций, поэтому код становится чуть более неопрятным:

public IObservable<string> HandleAsync(IList<Uri> requests)
{
	var client = new WebClient();
	var subject = new Subject<string>();
	var enumerator = requests.GetEnumerator();
	Action takeNext = () =>
	{
		if (enumerator.MoveNext()) { client.DownloadStringAsync(enumerator.Current); } else { subject.OnCompleted(); }
	};

	client.DownloadStringCompleted += (s, e) =>
	{
		if (e.Error != null) { subject.OnError(e.Error); } else { subject.OnNext(e.Result); takeNext(); }
	};
	
	takeNext();
	return subject;
}

В обоих случаях мы по сути описываем корутины, передающие управление в момент окончания обработки запроса. В синхронной реализации компилятор разворачивает yield в конечный автомат, генерирующий поток ответов, в асинхронной мы создаём его самостоятельно через замыкание на enumerator и итерацию внутри обработчика события DownloadStringCompleted. Те, кто уже использует async/await, могут переложить часть забот по описанию автомата на компилятор:

public async void HandleTaskAsync(IList<Uri> requests, Action<IObservable<string>> setObservable)
{
	var subject = new Subject<string>();
	setObservable(subject);

	var client = new WebClient();
	foreach (var request in requests)
	{
		var response = await client.DownloadStringTaskAsync(request); subject.OnNext(response);
	}
}

Однако и в этом случае не обошлось без некоторых костылей – чтобы передать observable вызывающему методу, приходится использовать дополнительный параметр Action. Можно было бы воспользоваться интерфейсом IProgress, но для целостности будем рассматривать только IObservable.

Можно заметить, что на самом деле все эти 3 реализации очень похожи и отличаются лишь способом вызова континуаций (на следующей кликабельной картинке одинаковыми цветами отмечены аналогичные участки кода для каждого варианта).

Вернёмся к нашей асинхронной реализации с использованием observables. Хотелось бы всё-таки сделать её более обобщённой чтобы мы могли использовать использовать её подобно другим функциям высшего порядка из ObservableExtensions. Если ограничиться только EAP, то у любой задачи с последовательными асинхронными операциями есть:

  • функция run: T -> unit, запускающая операцию для очередного элемента входной последовательности
  • событие окончания выполнения операции completed: (object * TEventArgs) -> unit
  • параметрический тип TEventArgs, как правило содержащий информацию о результате выполнения операции или о произошедшей ошибке

Следовательно, можно написать следующую функцию:

public static IObservable<TResult> SelectFromAsyncEventPattern<TSource, TResult, TDelegate, TEventArgs>(
	this IEnumerable<TSource> source,
	Action<TSource> run,
	Func<EventHandler<TEventArgs>, TDelegate> conversion,
	Action<TDelegate> subscribe,
	Action<TDelegate> unsubscribe,
	Func<TEventArgs, TResult> mapResult,
	Func<TEventArgs, Exception> mapException)
	where TEventArgs : EventArgs
{
	var enumerator = source.GetEnumerator();
	var subject = new Subject<TResult>();
	var convertedHandler = default(TDelegate);
	Action takeNext = () =>
	{
		if (enumerator.MoveNext()) { run(enumerator.Current); } else { subject.OnCompleted(); unsubscribe(convertedHandler); }
	};

	EventHandler<TEventArgs> handler = (s, e) =>
	{
		var exception = mapException(e); if (exception != null) { subject.OnError(exception); unsubscribe(convertedHandler); } else { var result = mapResult(e); subject.OnNext(result); takeNext(); }
	};

	convertedHandler = conversion(handler);
	subscribe(convertedHandler);
	takeNext();
	return subject;
}

На первый взгляд сигнатура может показаться не очень понятной и дружелюбной. В частности, пришлось ввести ещё один параметрический тип TDelegate и параметры conversion, subscribe и unsubscribe для обобщения события завершения операции (подобно методу Observable.FromEventPattern из Rx). В остальном код практически идентичен изначальной асинхронной реализации – подписываемся на событие, при каждом срабатывании сообщаем подписчикам о новом значении, аналогично уведомляем их при ошибке или после обработки всех элементов входной последовательности.

Пример использования:

public IObservable<string> HandleAsync(IList<Uri> requests)
{
	var client = new WebClient();
	return requests.SelectFromAsyncEventPattern<Uri, string, DownloadStringCompletedEventHandler, DownloadStringCompletedEventArgs>(
		client.DownloadStringAsync,
		handler => (s, e) => handler(s, e),
		x => client.DownloadStringCompleted += x,
		x => client.DownloadStringCompleted -= x,
		x => x.Result,
		x => x.Error);
}

К сожалению, компилятору не удаётся выполнить автоматический вывод типов, поэтому приходится указывать их вручную. Однако такой код всё равно выглядит лучше множественных вложенных континуаций.

Посмотреть и скачать примеры кода можно на Bitbucket.