Вопрос по java – Селекторы java.nio и SocketChannel для продолжения потоковой передачи

6

В настоящее время я использую java.nio.channel.Selectors & amp; SocketChannels для приложения, которое откроет соединения 1-ко-многим для продолжения потоковой передачи на сервер. У меня есть три потока для моего приложения: StreamWriteWorker - выполняет операцию записи в SocketChannel, StreamReadWorker - читает байты из буфера и анализирует содержимое, а StreamTaskDispatcher - выполняет выбор селектора для readyOps и отправляет новые исполняемые элементы для рабочих потоков.

Проблема - вызов метода выбора селектора возвращает только значение & gt; 0 (действительные готовые операции) при первом вызове; Я могу выполнить запись и отправку данных по всем готовым каналам за один раз, но все последующие вызовы метода выбора Selector возвращают 0.

Вопрос: Нужно ли вызывать close на SocketChannel после каждого чтения / записи (надеюсь, что нет!)? Если нет, то что может быть причиной недоступности SocketChannels для операций чтения / записи?

Мне жаль, что я не могу опубликовать код, но я надеюсь, что объяснил проблему достаточно ясно, чтобы кто-то помог. Я искал ответы и вижу, что вы не можете повторно использовать соединение SocketChannel после его закрытия, но мой канал не должен быть закрыт, сервер никогда не получит результат потока EOF.

Я добился некоторого прогресса и выяснил, что операция записи не выполнялась в приложении сервера из-за ошибки синтаксического анализа json. Так что теперь мой SocketChannel в коде клиентского приложения становится готовым к другой операции записи после того, как он обработал операцию чтения. Я предполагаю, что это TCP-природа SocketChannels. Однако SocketChannel не становится доступным для другой операции чтения на стороне приложения сервера. Это нормальное поведение для SocketChannels? Нужно ли закрывать соединение на стороне клиента после операции чтения и устанавливать новое соединение?

Вот пример кода того, что я пытаюсь сделать:

package org.stream.socket;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CodingErrorAction;
import java.util.HashMap;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.commons.lang3.RandomStringUtils;

import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import com.google.gson.stream.JsonToken;

public class ClientServerTest {

    private LinkedBlockingQueue<byte[]> dataQueue = new LinkedBlockingQueue<byte[]>();
    private ExecutorService executor = Executors.newFixedThreadPool(1);
    private HashMap<String, Integer> uuidToSize = new HashMap<String, Integer>();

    private class StreamWriteTask implements Runnable {
        private ByteBuffer buffer;
        private SelectionKey key;
        private Selector selector;

        private StreamWriteTask(ByteBuffer buffer, SelectionKey key, Selector selector) {
            this.buffer = buffer;
            this.key = key;
            this.selector = selector;
        }

        @Override
        public void run() {
            SocketChannel sc = (SocketChannel) key.channel();
            byte[] data = (byte[]) key.attachment();
            buffer.clear();
            buffer.put(data);
            buffer.flip();
            int results = 0;
            while (buffer.hasRemaining()) {
                try {
                    results = sc.write(buffer);
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

                if (results == 0) {
                    buffer.compact();
                    buffer.flip();
                    data = new byte[buffer.remaining()];
                    buffer.get(data);
                    key.interestOps(SelectionKey.OP_WRITE);
                    key.attach(data);
                    selector.wakeup();
                    return;
                }
            }

            key.interestOps(SelectionKey.OP_READ);
            key.attach(null);
            selector.wakeup();
        }

    }

    private class StreamReadTask implements Runnable {
        private ByteBuffer buffer;
        private SelectionKey key;
        private Selector selector;

        private StreamReadTask(ByteBuffer buffer, SelectionKey key, Selector selector) {
            this.buffer = buffer;
            this.key = key;
            this.selector = selector;
        }

        private boolean checkUUID(byte[] data) {
            return uuidToSize.containsKey(new String(data));
        }

        @Override
        public void run() {
            SocketChannel sc = (SocketChannel) key.channel();
            buffer.clear();
            byte[] data = (byte[]) key.attachment();
            if (data != null) {
                buffer.put(data);
            }
            int count = 0;
            int readAttempts = 0;
            try {
                while ((count = sc.read(buffer)) > 0) {
                    readAttempts++;
                }
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            if (count == 0) {
                buffer.flip();
                data = new byte[buffer.limit()];
                buffer.get(data);
                if (checkUUID(data)) {
                    key.interestOps(SelectionKey.OP_READ);
                    key.attach(data);
                } else {
                    System.out.println("Clinet Read - uuid ~~~~ " + new String(data));
                    key.interestOps(SelectionKey.OP_WRITE);
                    key.attach(null);
                }
            }

            if (count == -1) {
                try {
                    sc.close();
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }

            selector.wakeup();
        }

    }

    private class ClientWorker implements Runnable {

        @Override
        public void run() {
            try {
                Selector selector = Selector.open();
                SocketChannel sc = SocketChannel.open();
                sc.configureBlocking(false);
                sc.connect(new InetSocketAddress("127.0.0.1", 9001));
                sc.register(selector, SelectionKey.OP_CONNECT);
                ByteBuffer buffer = ByteBuffer.allocateDirect(65535);

                while (selector.isOpen()) {
                    int count = selector.select(10);

                    if (count == 0) {
                        continue;
                    }

                    Iterator<SelectionKey> it = selector.selectedKeys().iterator();

                    while (it.hasNext()) {
                        final SelectionKey key = it.next();
                        it.remove();
                        if (!key.isValid()) {
                            continue;
                        }

                        if (key.isConnectable()) {
                            sc = (SocketChannel) key.channel();
                            if (!sc.finishConnect()) {
                                continue;
                            }
                            sc.register(selector, SelectionKey.OP_WRITE);
                        }

                        if (key.isReadable()) {
                            key.interestOps(0);
                            executor.execute(new StreamReadTask(buffer, key, selector));
                        }
                        if (key.isWritable()) {
                            key.interestOps(0);
                            if(key.attachment() == null){
                                key.attach(dataQueue.take());
                            }
                            executor.execute(new StreamWriteTask(buffer, key, selector));
                        }
                    }
                }
            } catch (IOException ex) {
                // Handle Exception
            }catch(InterruptedException ex){

            }

        }
    }

    private class ServerWorker implements Runnable {
        @Override
        public void run() {
            try {
                Selector selector = Selector.open();
                ServerSocketChannel ssc = ServerSocketChannel.open();
                ServerSocket socket = ssc.socket();
                socket.bind(new InetSocketAddress(9001));
                ssc.configureBlocking(false);
                ssc.register(selector, SelectionKey.OP_ACCEPT);
                ByteBuffer buffer = ByteBuffer.allocateDirect(65535);
                DataHandler handler = new DataHandler();

                while (selector.isOpen()) {
                    int count = selector.select(10);

                    if (count == 0) {
                        continue;
                    }

                    Iterator<SelectionKey> it = selector.selectedKeys().iterator();

                    while (it.hasNext()) {
                        final SelectionKey key = it.next();
                        it.remove();
                        if (!key.isValid()) {
                            continue;
                        }

                        if (key.isAcceptable()) {
                            ssc = (ServerSocketChannel) key.channel();
                            SocketChannel sc = ssc.accept();
                            sc.configureBlocking(false);
                            sc.register(selector, SelectionKey.OP_READ);
                        }
                        if (key.isReadable()) {
                            handler.readSocket(buffer, key);
                        }
                        if (key.isWritable()) {
                            handler.writeToSocket(buffer, key);
                        }
                    }
                }

            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

    }

    private class DataHandler {

        private JsonObject parseData(StringBuilder builder) {
            if (!builder.toString().endsWith("}")) {
                return null;
            }

            JsonParser parser = new JsonParser();
            JsonObject obj = (JsonObject) parser.parse(builder.toString());
            return obj;
        }

        private void readSocket(ByteBuffer buffer, SelectionKey key)
                throws IOException {
            SocketChannel sc = (SocketChannel) key.channel();
            buffer.clear();
            int count = Integer.MAX_VALUE;
            int readAttempts = 0;
            try {
                while ((count = sc.read(buffer)) > 0) {
                    readAttempts++;
                }
            } catch (IOException e) {
                e.printStackTrace();
            }

            if (count == 0) {
                buffer.flip();
                StringBuilder builder = key.attachment() instanceof StringBuilder ? (StringBuilder) key
                        .attachment() : new StringBuilder();
                Charset charset = Charset.forName("UTF-8");
                CharsetDecoder decoder = charset.newDecoder();
                decoder.onMalformedInput(CodingErrorAction.IGNORE);
                System.out.println(buffer);
                CharBuffer charBuffer = decoder.decode(buffer);
                String content = charBuffer.toString();
                charBuffer = null;
                builder.append(content);    
                System.out.println(content);
                JsonObject obj = parseData(builder);
                if (obj == null) {
                    key.attach(builder);
                    key.interestOps(SelectionKey.OP_READ);
                } else {
                    System.out.println("data ~~~~~~~ " + builder.toString());
                    JsonPrimitive uuid = obj.get("uuid").getAsJsonPrimitive();
                    key.attach(uuid.toString().getBytes());
                    key.interestOps(SelectionKey.OP_WRITE);
                }
            }

            if (count == -1) {
                key.attach(null);
                sc.close();
            }
        }

        private void writeToSocket(ByteBuffer buffer, SelectionKey key)
                throws IOException {
            SocketChannel sc = (SocketChannel) key.channel();
            byte[] data = (byte[]) key.attachment();
            buffer.clear();
            buffer.put(data);
            buffer.flip();
            int writeAttempts = 0;
            while (buffer.hasRemaining()) {
                int results = sc.write(buffer);
                writeAttempts++;
                System.out.println("Write Attempt #" + writeAttempts);
                if (results == 0) {
                    buffer.compact();
                    buffer.flip();
                    data = new byte[buffer.remaining()];
                    buffer.get(data);
                    key.attach(data);
                    key.interestOps(SelectionKey.OP_WRITE);
                    break;
                }
            }

            key.interestOps(SelectionKey.OP_READ);
            key.attach(null);
        }
    }

    public ClientServerTest() {
        for (int index = 0; index < 1000; index++) {
            JsonObject obj = new JsonObject();
            String uuid = UUID.randomUUID().toString();
            uuidToSize.put(uuid, uuid.length());
            obj.addProperty("uuid", uuid);
            String data = RandomStringUtils.randomAlphanumeric(10000);
            obj.addProperty("event", data);
            dataQueue.add(obj.toString().getBytes());
        }

        Thread serverWorker = new Thread(new ServerWorker());
        serverWorker.start();

        Thread clientWorker = new Thread(new ClientWorker());
        clientWorker.start();

    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        ClientServerTest test = new ClientServerTest();
        for(;;){

        }
    }

}
Я не уверен, что заявленная вами проблема, которая мотивирует эти три темы, даже существует. Вы вnon-blocking mode. Ничего не будет ждать, кроме вызова select (). Но если ваш select () возвращает ноль, ничего не готово. Случайное закрытие каналов не изменит этого. user207421
Канал становится готовым к записи, когда в его выходном буфере есть место. Выtold об этом селектором, если и только если вы зарегистрированы на OP_WRITE в то время. Я все еще не понимаю ваш последний вопрос. Я полагаю, что некоторые вы разместили код. user207421
Как вы думаете, зачем вам эти три темы? Вам, конечно, не нужен поток записи, и с небольшой реструктуризацией в соответствии с намерениями NIO вы также сможете избавиться от потока чтения. Многопоточность и NIO действительно не смешиваются. Если вы хотите многопоточность, используйте java.net и блокируйте ввод / вывод. user207421
Спасибо за ваш комментарий. Я начал с трех потоков, потому что своевременность является важным фактором; Я не хотел, чтобы чтение ожидалось при записи или наоборот, и я буду работать с большим количеством данных. Есть ли у вас ответ по поводу поставленной мною проблемы? Robert Brooks
Я добился некоторого прогресса и выяснил, что операция записи не выполнялась в приложении сервера из-за ошибки синтаксического анализа json. Так что теперь мой SocketChannel в коде клиентского приложения становится готовым к другой операции записи после того, как он обработал операцию чтения. Я предполагаю, что это TCP-природа SocketChannels. Однако SocketChannel не становится доступным для операции чтения на стороне приложения сервера. Это нормальное поведение для SocketChannels? Нужно ли закрывать соединение на стороне клиента после операции чтения и устанавливать новое соединение? Robert Brooks

Ваш Ответ

1   ответ
4

OP_CONNECT is to attempt finishConnect() once, and if it succeeds deregister OP_CONNECT and register OP_READ or OP_WRITE, probably the latter as you are a client. Looping and sleeping in non-blocking mode doesn't make sense. If finishConnect() returns false, OP_CONNECT will fire again.

Your processing of !key.isAcceptable(), !key.isReadable(), and !key.isWriteable() makes absolutely zero sense whatsoever. If the key is acceptable, call accept(). If it's readable, call read(). If it's writeable, call write(). It's as simple as that.

You need to be aware that channels are almost always writeable, except for the brief periods when their socket send buffer is full. So only register for OP_WRITE when you have something to write, or better still after you've tried a write and got a zero return; then when OP_WRITE fires, retry the write and deregister OP_WRITE unless you got another zero.

You are being far too economical with your ByteBuffer. In practice you need one per channel. You can save it as the key attachment so you can get it back when you need it. Otherwise you don't have any way of accumulating partial reads, which are certain to happen, or any way of retrying writes either.

Error: User Rate Limit Exceededwrite()Error: User Rate Limit Exceeded
Error: User Rate Limit Exceeded Robert Brooks
Error: User Rate Limit Exceeded Robert Brooks
Error: User Rate Limit Exceeded Robert Brooks
Error: User Rate Limit Exceeded Robert Brooks

Похожие вопросы