Вопрос по java, multithreading – ThreadPoolExecutor с ArrayBlockingQueue

5

Я начал читать больше о ThreadPoolExecutor из Java Doc, поскольку я использую его в одном из моих проектов. Так может ли кто-нибудь объяснить мне, что на самом деле означает эта строка? - Я знаю, что означает каждый параметр, но я хотел понять это более общими / непрофессиональными способами из некоторых экспертов здесь.

ExecutorService service = new ThreadPoolExecutor(10, 10, 1000L,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10, true), new 
ThreadPoolExecutor.CallerRunsPolicy());

Updated:- Постановка проблемы:

Каждый поток использует уникальный идентификатор от 1 до 1000, и программа должна работать в течение 60 или более минут, поэтому через эти 60 минут возможно, что все идентификаторы будут завершены, поэтому мне нужно повторно использовать эти идентификаторы снова. Так что это нижеприведенная программа, которую я написал с помощью вышеприведенного исполнителя.

class IdPool {
    private final LinkedList<Integer> availableExistingIds = new LinkedList<Integer>();

    public IdPool() {
        for (int i = 1; i <= 1000; i++) {
            availableExistingIds.add(i);
        }
    }

    public synchronized Integer getExistingId() {
        return availableExistingIds.removeFirst();
    }

    public synchronized void releaseExistingId(Integer id) {
        availableExistingIds.add(id);
    }
}


class ThreadNewTask implements Runnable {
    private IdPool idPool;

    public ThreadNewTask(IdPool idPool) {
        this.idPool = idPool;
    }

    public void run() {
        Integer id = idPool.getExistingId();
        someMethod(id);
        idPool.releaseExistingId(id);
    }

// This method needs to be synchronized or not?
    private synchronized void someMethod(Integer id) {
        System.out.println("Task: " +id);
// and do other calcuations whatever you need to do in your program
    }
}

public class TestingPool {
    public static void main(String[] args) throws InterruptedException {
        int size = 10;
        int durationOfRun = 60;
        IdPool idPool = new IdPool();   
        // create thread pool with given size
        ExecutorService service = new ThreadPoolExecutor(size, size, 500L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(size), new ThreadPoolExecutor.CallerRunsPolicy()); 

        // queue some tasks
        long startTime = System.currentTimeMillis();
        long endTime = startTime + (durationOfRun * 60 * 1000L);

        // Running it for 60 minutes
        while(System.currentTimeMillis() <= endTime) {
            service.submit(new ThreadNewTask(idPool));
        }

        // wait for termination        
        service.shutdown();
        service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 
    }
}

Мои вопросы: - Этот код является правильным, насколько рассматривается производительность или нет? И что еще я могу сделать это здесь, чтобы сделать это более точным? Любая помощь будет оценена.

Я не вижу ничего плохого в вашем коде, кроме, возможно, наличия попытки / наконец в вашем & quot; run & quot; метод, гарантирующий, что идентификаторы всегда будут освобождены (например, когда у вас есть более сложный код в "someMethod"). Matt
А такжеidPool.releaseExistingId(id); должен прийти в наконец блок правильно? arsenal
Почемуthese three questions так похоже? ...Problem Statement is:- Each thread uses unique ID between 1 and 1000 ... OldCurmudgeon
@ Matt, спасибо за комментирование, значит, некоторые методы должны быть синхронизированы? или нет? Как и в моем случае, я сделал это синхронизировано. arsenal

Ваш Ответ

3   ответа
9

я прошу прощения, это ответ на предыдущий ответ, но я хотел форматирования].

За исключением реальности, вы НЕ блокируете блок, когда элемент передается в ThreadPoolExecutor с полной очередью. Причина этого в том, что ThreadPoolExecutor вызывает метод BlockingQueue.offer (T item), который по определению является неблокирующим методом. Он либо добавляет элемент и возвращает значение true, либо не добавляет (при заполнении) и возвращает значение false. Затем ThreadPoolExecutor вызывает зарегистрированного RejectedExecutionHandler для решения этой ситуации.

От Javadoc:

Executes the given task sometime in the future. The task may execute in a new thread or in an existing pooled thread. If the task cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been reached, the task is handled by the current RejectedExecutionHandler.

По умолчанию используется ThreadPoolExecutor.AbortPolicy (), который генерирует исключение RejectedExecutionException из & quot; submit & quot; или "выполнить" метод ThreadPoolExecutor.

try {
   executorService.execute(new Runnable() { ... });
}
catch (RejectedExecutionException e) {
   // the queue is full, and you're using the AbortPolicy as the 
   // RejectedExecutionHandler
}

Однако вы можете использовать другие обработчики, чтобы сделать что-то другое, например игнорировать ошибку (DiscardPolicy), или запустить ее в потоке, который называется & quot; execute & quot; или "отправить" метод (CallerRunsPolicy). Этот пример позволяет любому потоку вызвать & quot; представить & quot; или "выполнить" Метод запуска запрошенной задачи, когда очередь заполнена. (это означает, что в любой момент времени вы можете запустить еще одну вещь поверх того, что находится в самом пуле):

ExecutorService service = new ThreadPoolExecutor(..., new ThreadPoolExecutor.CallerRunsPolicy());

Если вы хотите заблокировать и подождать, вы можете реализовать свой собственный RejectedExecutionHandler, который будет блокировать до тех пор, пока в очереди не появится слот (это приблизительная оценка, я не скомпилировал или не запустил это, но вы должны понять):

public class BlockUntilAvailableSlot implements RejectedExecutionHandler {
  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
     if (e.isTerminated() || e.isShutdown()) {
        return;
     }

     boolean submitted = false;
     while (! submitted) {
       if (Thread.currentThread().isInterrupted()) {
            // be a good citizen and do something nice if we were interrupted
            // anywhere other than during the sleep method.
       }

       try {
          e.execute(r);
          submitted = true;
       }
       catch (RejectedExceptionException e) {
         try {
           // Sleep for a little bit, and try again.
           Thread.sleep(100L);
         }
         catch (InterruptedException e) {
           ; // do you care if someone called Thread.interrupt?
           // if so, do something nice here, and maybe just silently return.
         }
       }
     }
  }
}
Error: User Rate Limit Exceeded arsenal
Error: User Rate Limit Exceeded
2

ExecutorService который обрабатывает выполнение пула потоков. В этом случае начальное и максимальное количество потоков в пуле равно 10. Когда поток в пуле становится бездействующим в течение 1 секунды (1000 мс), он уничтожает его (таймер простоя), однако, поскольку максимальное и основное число потоков одинаково, этого никогда не произойдет (он всегда поддерживает 10 потоков и будет никогда не запускайте более 10 потоков).

Он используетArrayBlockingQueue управлять запросами на выполнение с 10 слотами, поэтому, когда очередь заполнена (после того, как 10 потоков были поставлены в очередь), она будет блокировать вызывающего.

Если поток отклонен (что в этом случае будет связано с закрытием службы, поскольку потоки будут поставлены в очередь, или вы будете заблокированы при постановке в очередь потока, если очередь заполнена), тогда предлагаетсяRunnable будет выполняться в потоке вызывающего.

Error: User Rate Limit Exceeded arsenal
Error: User Rate Limit Exceeded arsenal
Error: User Rate Limit Exceeded arsenal
Error: User Rate Limit Exceeded
0

проверьте код ниже, используя семафор. Не уверен, что это то, что вы хотите. Но это заблокирует, если больше нет разрешений на приобретение. Также важен ли для вас ID?

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

class ThreadNewTask implements Runnable {
    private Semaphore idPool;

    public ThreadNewTask(Semaphore idPool) {
        this.idPool = idPool;
    }

    public void run() {
//      Integer id = idPool.getExistingId();
        try {
            idPool.acquire();
            someMethod(0);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            idPool.release();
        }
//      idPool.releaseExistingId(id);
    }

    // This method needs to be synchronized or not?
    private void someMethod(Integer id) {
        System.out.println("Task: " + id);
        // and do other calcuations whatever you need to do in your program
    }
}

public class TestingPool {
    public static void main(String[] args) throws InterruptedException {
        int size = 10;
        int durationOfRun = 60;
        Semaphore idPool = new Semaphore(100); 
//      IdPool idPool = new IdPool();
        // create thread pool with given size
        ExecutorService service = new ThreadPoolExecutor(size, size, 500L,
                TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(size),
                new ThreadPoolExecutor.CallerRunsPolicy());

        // queue some tasks
        long startTime = System.currentTimeMillis();
        long endTime = startTime + (durationOfRun * 60 * 1000L);

        // Running it for 60 minutes
        while (System.currentTimeMillis() <= endTime) {
            service.submit(new ThreadNewTask(idPool));
        }

        // wait for termination
        service.shutdown();
        service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
    }
}
Error: User Rate Limit Exceeded arsenal
Error: User Rate Limit Exceeded
Error: User Rate Limit Exceeded arsenal
Error: User Rate Limit Exceeded arsenal

Похожие вопросы