using System; using System.Collections.Generic; using System.Reactive.Linq; using System.Reactive.Threading.Tasks; namespace Octokit.Reactive.Internal { internal static class ConnectionExtensions { public static IObservable GetAndFlattenAllPages(this IConnection connection, Uri url, IDictionary parameters = null, string accepts = null) { return GetPages(url, parameters, (pageUrl, pageParams) => connection.GetAsync>(pageUrl, pageParams, accepts).ToObservable()); } static IObservable GetPages(Uri uri, IDictionary parameters, Func, IObservable>>> getPageFunc) { return getPageFunc(uri, parameters).Expand(resp => { var nextPageUrl = resp.ApiInfo.GetNextPageUrl(); return nextPageUrl == null ? Observable.Empty>>() : Observable.Defer(() => getPageFunc(nextPageUrl, null)); }) .Where(resp => resp != null) .SelectMany(resp => resp.BodyAsObject); } } }