Вопрос по multithreading, concurrency – Потоки, не выполняющиеся одновременно на UDP-сервере Netty

4

Код, который я анализирую, создает сервер UDP с Netty NioDatagramChannelFactory. Он создает пул потоков с:

ExecutorService threadPool = Executors.newCachedThreadPool();

Затем канал дейтаграмм, pipeFactory & amp; самонастройки:

int workerCount = 10;
DatagramChannelFactory datagramChannelFactory = new NioDatagramChannelFactory(threadPool, workerCount);
ChannelPipelineFactory pipelineFactory = new SNMPTrapsPipeLineFactory();

ConnectionlessBootstrap bootStrap = new ConnectionlessBootstrap(datagramChannelFactory);
bootStrap.setPipelineFactory(pipelineFactory);
bootStrap.bind(new InetSocketAddress(host, port));

В pipeFactory метод getPipeline () добавляет пользовательские обработчики.

Также как сказано в: Многопоточная обработка сообщений UDP

Есть только один поток, обрабатывающий полученные сообщения. В журналах имена потоков отображаются какNew I/O datagram worker #1 лайк:

2012-04-20 09: 20: 51,853 Новый работник дейтаграммы ввода-вывода # 1 '- -' 1 INFO [c.e.m.r.s.h.SNMPTrapsRequestHandler: 42] messageReceived | Обработка: V1TRAP [reqestID = 0, ...]

Я прочитал документацию и эту запись:Много UDP-запросов потеряно на UDP-сервере с Netty

А потом я немного изменил код в соответствии с этими записями. Теперь пул потоков создается с помощью:

int corePoolSize = 5;
ExecutorService threadPool = new OrderedMemoryAwareThreadPoolExecutor(corePoolSize, 1048576, 1048576);

И конвейер фабрики с и ExecutionHandler:

ExecutionHandler executionHandler = new ExecutionHandler(threadPool);
ChannelPipelineFactory pipelineFactory = new SNMPTrapsPipeLineFactory(executionHandler);

И getPipeline () добавляет обработчик, как описано:

public class SNMPTrapsPipeLineFactory implements ChannelPipelineFactory {

    private ExecutionHandler executionHandler = null;

    public SNMPTrapsPipeLineFactory(ExecutionHandler executionHandler) { 
        this.executionHandler = executionHandler;
    }

    @Override
    public ChannelPipeline getPipeline() throws Exception {

        ChannelPipeline pipeline = Channels.pipeline();
        pipeline.addFirst("ExecutorHandler", executionHandler);

        // Here the custom handlers are added
        pipeline.addLast( ... )
    }

Теперь я получаю 4 разных имени потока в журналах. Они появляются какpool-2-thread-1, pool-2-thread-2, так далее...

Например:

2012-05-09 09: 12: 19,589 ИНФОРМАЦИЯ о pool-2-thread-1 [c.e.m.r.s.h.SNMPTrapsRequestHandler: 46] messageReceived | Обработка: V1TRAP [reqestID = 0, ...]

Но они не обрабатываются одновременно. Обработка в messageReceived () должна завершиться в одном потоке для следующего, чтобы обработать следующее сообщение. Я отправил на сервер кучу сообщений от разных клиентов, и журналы, которые я получаю, не переплетаются. Я также пытался Thread.sleep () внутри messageReceived (), и подтверждает предыдущее.

Я что-то пропустил? Есть ли способ добиться НАСТОЯЩЕГО многопоточного сервера UDP с Netty? Как я могу получить разные потоки для одновременного выполнения messageReceived ()?

Если я не ошибаюсь, то OrderedMemoryAwareThreadPoolExecutor выполняет запросы от того же клиента в том же потоке. kofemann

Ваш Ответ

2   ответа
0

это нормально, что есть только один поток, который обрабатывает сообщения UDP для декодирования. Поскольку UDP не использует сессии, только один поток может принимать данные на один порт UDP и декодировать их.

После того, как вы декодировали свои данные и поместили их в буфер или определенный объект Java, вы можете поместить этот объект в пул потоков, которые будут обрабатывать его (обработчик выполнения -> ваш бизнес-обработчик). Затем новые поступающие данные на порт UDP могут быть декодированы после того, как вы выпустили предыдущие декодированные данные в обработчик выполнения.

Поток пулов, который вы можете указать при создании NioDatagramChannelFactory, используется только тогда, когда вы прослушиваете данные на нескольких портах. Только один поток на порт имеет смысл. Даже если вы укажете 100 рабочих в этом конструкторе, будет использоваться только один, если вы настроили один порт UDP.

Не хотели бы вы опубликовать этот код в качестве ответа? Я согласен с вами в том, что, даже если имеется один принимающий порт, может быть много источников, поэтому многопоточность не только имеет смысл, но и представляется необходимой для обслуживания большого количества клиентов.
Кажется, вы правы, только один поток на порт. Но для меня это НЕ имеет смысла ... Похоже, именно так устроен Netty. Я должен был добавить свой & quot; Код темы & quot; вести себя так, когда я использую фреймворк, который обрабатывает пулы потоков с триллионами опций ... Я не могу в это поверить ... nephewtom
0

которая выскочила на меня, это то, что вы поставили свой обработчик выполненияfirst в трубопроводе. Я полагаю, что цель заключается в том, чтобы весь конвейер вплоть до «приложения» обработчик должен выполняться потоками ввода-вывода, выполняющими ввод-выводand расшифровка.

Поэтому я бы сказал, что сначала вы захотите добавить все ваши обработчики декодирования SNMPTrap, а затем, когда у вас есть реальный SNMPTrap, он передается обработчику выполнения, который, в свою очередь, передает ловушку фактическим потребителям ловушек. сделать что-то полезное с.

@Override
public ChannelPipeline getPipeline() throws Exception {

    ChannelPipeline pipeline = Channels.pipeline(
         new SomethingSomethingDecoder(),
         new SNMPTrapDecoder(),
         executionHandler.
         snmpTrapConsumerHandler
    );
}

По крайней мере, так показано наExecutionHandler Javadoc, и выше, моя интерпретация этого.

Да. Я попробовал этот подход, описанный в документации, и он не сработал. Затем я попытался добавить executeHandler с помощью addFirst (), как описано в ответе на вопрос: много UDP-запросов потеряно на UDP-сервере с Netty. Но никто из них не работает. nephewtom

Похожие вопросы