Вопрос по async-await, c#, asynchronous – Как ограничить количество одновременных операций асинхронного ввода-вывода?

91
// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", ... };

// now let's send HTTP requests to each of these URLs in parallel
urls.AsParallel().ForAll(async (url) => {
    var client = new HttpClient();
    var html = await client.GetStringAsync(url);
});

Здесь проблема, он запускает более 1000 одновременных веб-запросов. Есть ли простой способ ограничить количество одновременных http-запросов? Таким образом, в любой момент времени загружается не более 20 веб-страниц. Как это сделать максимально эффективно?

Error: User Rate Limit Exceededyour previous question? svick
stackoverflow.com/questions/9290498/…Error: User Rate Limit Exceeded Chris Disley
Error: User Rate Limit Exceededstackoverflow.com/a/10802883/66372 eglasius
Error: User Rate Limit ExceededHttpClientError: User Rate Limit ExceededIDisposableError: User Rate Limit ExceededHttpClientError: User Rate Limit Exceeded Shimmy
Error: User Rate Limit Exceeded spender

Ваш Ответ

12   ответов
0

Error: User Rate Limit Exceeded

Error: User Rate Limit ExceededRunningError: User Rate Limit Exceeded
Error: User Rate Limit ExceededawaitError: User Rate Limit Exceeded
Error: User Rate Limit Exceeded Grief Coder
Error: User Rate Limit Exceeded
Error: User Rate Limit Exceeded Grief Coder
4

Error: User Rate Limit Exceeded

 class SomeChecker
 {
    private const int ThreadCount=20;
    private CountdownEvent _countdownEvent;
    private SemaphoreSlim _throttler;

    public Task Check(IList<string> urls)
    {
        _countdownEvent = new CountdownEvent(urls.Count);
        _throttler = new SemaphoreSlim(ThreadCount); 

        return Task.Run( // prevent UI thread lock
            async  () =>{
                foreach (var url in urls)
                {
                    // do an async wait until we can schedule again
                    await _throttler.WaitAsync();
                    ProccessUrl(url); // NOT await
                }
                //instead of await Task.WhenAll(allTasks);
                _countdownEvent.Wait();
            });
    }

    private async Task ProccessUrl(string url)
    {
        try
        {
            var page = await new WebClient()
                       .DownloadStringTaskAsync(new Uri(url)); 
            ProccessResult(page);
        }
        finally
        {
            _throttler.Release();
            _countdownEvent.Signal();
        }
    }

    private void ProccessResult(string page){/*....*/}
}
Error: User Rate Limit ExceededProccessUrlError: User Rate Limit ExceededCheck(...)Error: User Rate Limit ExceededWhenAllError: User Rate Limit ExceededWhenAnyError: User Rate Limit Exceeded
0

EDIT

Error: User Rate Limit Exceeded
Error: User Rate Limit Exceeded
Error: User Rate Limit Exceeded
Error: User Rate Limit ExceededoneError: User Rate Limit ExceededalwaysError: User Rate Limit Exceeded
Error: User Rate Limit Exceeded Grief Coder
4

Error: User Rate Limit ExceededError: User Rate Limit ExceededError: User Rate Limit Exceeded

// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", ... };

// now let's send HTTP requests to each of these URLs in parallel
await urls.ParallelForEachAsync(async (url) => {
    var client = new HttpClient();
    var html = await client.GetStringAsync(url);
}, maxDegreeOfParallelism: 20);
133

Error: User Rate Limit Exceeded

Error: User Rate Limit ExceededSemaphoreSlimError: User Rate Limit ExceededSemaphoreError: User Rate Limit ExceededWaitAsync(...)Error: User Rate Limit Exceeded

Error: User Rate Limit ExceededError: User Rate Limit Exceeded.

Error: User Rate Limit Exceeded

public async Task MyOuterMethod()
{
    // let's say there is a list of 1000+ URLs
    var urls = { "http://google.com", "http://yahoo.com", ... };

    // now let's send HTTP requests to each of these URLs in parallel
    var allTasks = new List<Task>();
    var throttler = new SemaphoreSlim(initialCount: 20);
    foreach (var url in urls)
    {
        // do an async wait until we can schedule again
        await throttler.WaitAsync();

        // using Task.Run(...) to run the lambda in its own parallel
        // flow on the threadpool
        allTasks.Add(
            Task.Run(async () =>
            {
                try
                {
                    var client = new HttpClient();
                    var html = await client.GetStringAsync(url);
                }
                finally
                {
                    throttler.Release();
                }
            }));
    }

    // won't get here until all urls have been put into tasks
    await Task.WhenAll(allTasks);

    // won't get here until all tasks have completed in some way
    // (either success or exception)
}

Error: User Rate Limit Exceeded

Error: User Rate Limit ExceededError: User Rate Limit Exceeded.

Error: User Rate Limit ExceededHttpClient
Error: User Rate Limit Exceededmsdn.microsoft.com/en-us/library/…
Error: User Rate Limit Exceeded
Error: User Rate Limit ExceededParallel.ForEachError: User Rate Limit Exceeded
Error: User Rate Limit ExceededWaitAsyncError: User Rate Limit ExceededWaitAsyncError: User Rate Limit Exceeded
8

Error: User Rate Limit Exceeded

Error: User Rate Limit ExceededError: User Rate Limit ExceededError: User Rate Limit Exceeded

Error: User Rate Limit Exceeded
Error: User Rate Limit Exceeded
Error: User Rate Limit Exceeded
Error: User Rate Limit Exceeded
2

Error: User Rate Limit Exceeded

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxActionsToRunInParallel">Optional, max numbers of the actions to run in parallel,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxActionsToRunInParallel = null)
    {
        if (maxActionsToRunInParallel.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxActionsToRunInParallel.Value, maxActionsToRunInParallel.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item).ContinueWith(res =>
                        {
                            // action is completed, so decrement the number of currently running tasks
                            semaphoreSlim.Release();
                        });
                    }));
                }

                // Wait for all of the provided tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }

Error: User Rate Limit Exceeded

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);
2

Error: User Rate Limit ExceededError: User Rate Limit Exceeded:

static async Task WhenAll(IEnumerable<Task> tasks, int maxThreadCount) {
    using (var guard = new SemaphoreSlim(initialCount: maxThreadCount)) {
        await Task.WhenAll(tasks.Select(async task => {
            await guard.WaitAsync();

            return task.ContinueWith(t => guard.Release());
        }));
    }
}
6

Error: User Rate Limit Exceeded

Error: User Rate Limit Exceeded

public static Task ForEachAsync<TIn>(
        IEnumerable<TIn> inputEnumerable,
        Func<TIn, Task> asyncProcessor,
        int? maxDegreeOfParallelism = null)
    {
        int maxAsyncThreadCount = maxDegreeOfParallelism ?? DefaultMaxDegreeOfParallelism;
        SemaphoreSlim throttler = new SemaphoreSlim(maxAsyncThreadCount,, maxAsyncThreadCount);

        IEnumerable<Task> tasks = inputEnumerable.Select(async input =>
        {
            await throttler.WaitAsync().ConfigureAwait(false);
            try
            {
                await asyncProcessor(input).ConfigureAwait(false);
            }
            finally
            {
                throttler.Release();
            }
        });

        return Task.WhenAll(tasks);
    }
-1

Error: User Rate Limit Exceeded

Error: User Rate Limit ExceededError: User Rate Limit Exceeded

With Actions

Error: User Rate Limit Exceeded

var listOfActions = n,ew List<Action>();
foreach (var url in urls)
{
    var localUrl = url;
    // Note that we create the Task here, but do not start it.
    listOfTasks.Add(new Task(() => CallUrl(localUrl)));
}

var options = new ParallelOptions {MaxDegreeOfParallelism = 20};
Parallel.Invoke(options, listOfActions.ToArray());

With Tasks

Error: User Rate Limit Exceeded

    /// <summary>
    /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel.
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
    /// </summary>
    /// <param name="tasksToRun">The tasks to run.</param>
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken())
    {
        await StartAndWaitAllThrottledAsync(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken);
    }

    /// <summary>
    /// Starts the given tasks and waits for them to complete. This will run the specified number of tasks in parallel.
    /// <para>NOTE: If a timeout is reached before the Task completes, another Task may be started, potentially running more than the specified maximum allowed.</para>
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
    /// </summary>
    /// <param name="tasksToRun">The tasks to run.</param>
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
    /// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken())
    {
        // Convert to a list of tasks so that we don't enumerate over it multiple times needlessly.
        var tasks = tasksToRun.ToList();

        using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel))
        {
            var postTaskTasks = new List<Task>();

            // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running.
            tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release())));

            // Start running each task.
            foreach (var task in tasks)
            {
                // Increment the number of tasks currently running and wait if too many are running.
                await throttler.WaitAsync(timeoutInMilliseconds, cancellationToken);

                cancellationToken.ThrowIfCancellationRequested();
                task.Start();
            }

            // Wait for all of the provided tasks to complete.
            // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler's using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object.
            await Task.WhenAll(postTaskTasks.ToArray());
        }
    }

Error: User Rate Limit Exceeded

var listOfTasks = new List<Task>();
foreach (var url in urls)
{
    var localUrl = url;
    // Note that we create the Task here, but do not start it.
    listOfTasks.Add(new Task(async () => await CallUrl(localUrl)));
}
await Tasks.StartAndWaitAllThrottledAsync(listOfTasks, 20);
Error: User Rate Limit Exceeded
0

Error: User Rate Limit ExceededMaxDegreeOfParallelismError: User Rate Limit ExceededParallel.ForEach():

var options = new ParallelOptions { MaxDegreeOfParallelism = 20 };

Parallel.ForEach(urls, options,
    url =>
        {
            var client = new HttpClient();
            var html = client.GetStringAsync(url);
            // do stuff with html
        });
Error: User Rate Limit ExceededGetStringAsync(url)Error: User Rate Limit ExceededawaitError: User Rate Limit Exceededvar htmlError: User Rate Limit ExceededTask<string>Error: User Rate Limit Exceededstring.
Error: User Rate Limit ExceededParallel.ForEach(...)Error: User Rate Limit ExceededsynchronousError: User Rate Limit Exceeded
0

Error: User Rate Limit ExceededError: User Rate Limit ExceededError: User Rate Limit Exceeded

Похожие вопросы