Home | Contact Us | FAQ | Search & Site Map | Link to Us
Sign In | Join | Other 45 Sites in Network
HomeAnnouncementsWhite Papers
Discussion GroupsFirst AidDatabasesJavaBeansGUIJava 3DVirtual MachineCORBASecurityToolsGeneral
Java DirectoryOpen Source ProjectsSample Book ChaptersUser GroupsWeb Resources
Related Topics
Databases.NETMore Topics ...

Java Forum / General / February 2008

Tip: Looking for answers? Try searching our database.

the best practise of udpchannel with selector??

Thread view: 
lightning - 18 Feb 2008 06:05 GMT
I wrote an module which can send and receive datagrams through UDP.
I use a selector to manage it,but I come with a problem of 100%cpu.

the code is as follows:

private ConcurrentLinkedQueue<ToSendData> resps = new
ConcurrentLinkedQueue<ToSendData>();
...

while (run) {
               selector.select();//why it does not block?
               if (key.isValid() && key.isReadable()) {
    ByteBuffer buffer =
ByteBuffer.allocate(Constant.MAX_RECEIVE_BUFFER_SIZE);
    buffer.order(ByteOrder.LITTLE_ENDIAN);
    InetSocketAddress sock = (InetSocketAddress) channel.receive(buffer);
    if (sock == null)
                   continue;
    receivedDatagramCount++;
    log.info("received No." + receivedDatagramCount+ " datagram");
    String ip = sock.getAddress().getHostAddress();
    int port = sock.getPort();
    if (!Crypt.decrypt(buffer.array(), buffer.position())) {
    log.warn("校验失败!");
    return;
    }
    buffer.flip();
    P2IHeaderInfo header = P2IHeaderInfo.getInstance(buffer);
               buffer.rewind();
    DispatchData data = new DispatchData(header, buffer, ip,port,
receivedDatagramCount);
    Task task = new Task(Task.DISPATCH, data);
    server.sendMessage(task);
    }
    if (key.isValid() && key.isWritable()) {
    if (resps.size() > 0) {
    for (ToSendData data = resps.poll(); data != null; data =
resps.poll()) {
        if (data.getIp().equals("e")) {
            break;
        }
        String ip = data.getIp();
        int port = data.getPort();
        ByteBuffer buffer = data.getBuffer();
        Crypt.encrypt(buffer.array(), buffer.remaining());
        byte[] x = new byte[buffer.remaining()];
        System.arraycopy(buffer.array(), 0, x, 0, buffer.remaining());
        sendDatagramCount++;
        log.info("Server sends No." + sendDatagramCount+ " datagram: ");
        channel.send(buffer,new InetSocketAddress(ip, port));
    }
    }
// If uncomment these block,the cpu goes 100%
//    else{
//          Thread.sleep(1);
//    }
    }

}

It seems that udpchannel is always writable and it won't block these
thread any bit,
I think maybe I need another thread to send datagrams and use a
LinkedBlockingQueue instead of ConcurrentLinkedQueue.

What is the best practice of making this module?
EJP - 19 Feb 2008 07:13 GMT
> It seems that udpchannel is always writable
Almost always, unless the socket send buffer is full. You shouldn't
normally register for OP_WRITE unless you've had a short write: in the
case of UPD, that would be a write() return of zero. If you get that,
queue the datagram, register for OP_WRITE, and when you get OP_WRITE,
unqueue the datagrams in the queue and try to send them all. If you
succeed, unregister OP_WRITE again.
lightning - 20 Feb 2008 01:37 GMT
Ok, I wrote the code like this:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;

public class UDPSender implements Runnable {
    private Thread thread;

    private boolean run;

    private DatagramChannel channel;

    private Selector selector;
    private SelectionKey mykey;
    private ConcurrentLinkedQueue<String> queue = new
ConcurrentLinkedQueue<String>();

    private String ip = "192.168.43.158";

    private int port = 2222;

    public UDPSender() {
        run = true;
        try {
            channel = DatagramChannel.open();
            channel.configureBlocking(false);
            selector = Selector.open();
            mykey=channel.register(selector, SelectionKey.OP_READ|
SelectionKey.OP_WRITE);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
            throw new InstantiationError();
        }
    }

    public void start() {
        thread = new Thread(this);
        thread.start();
    }
    public void send(){
        queue.add("sadfasdf");
        selector.wakeup();
    }
    public void run() {
        // TODO Auto-generated method stub
        try {
            while (run) {
                if(!queue.isEmpty()){
                    mykey.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);
                }
                selector.select();
                Iterator<SelectionKey> iter = selector.selectedKeys()
                        .iterator();
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    handle(key);
                    iter.remove();
                }

            }
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

    private void handle(SelectionKey key) {
        if (key.isReadable()) {
            System.out.println("there are something to read");
            ByteBuffer k=ByteBuffer.allocate(1000);
            try {
                channel.receive(k);
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            System.out.println("received: --"+new String(k.array(),
0,k.position())+"--");
        }
        if (key.isWritable()) {
            ByteBuffer x = ByteBuffer.wrap("hello".getBytes());
            Iterator<String> iter = queue.iterator();
            while (iter.hasNext()) {
                System.out.println("I send!");
                String k=iter.next();
                try {
                    channel.send(x, new InetSocketAddress(ip, port));
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            queue.clear();
            key.interestOps(SelectionKey.OP_READ);
        }
    }

    public void stop() {
        run = false;
        selector.wakeup();
    }

}

It seems the code works well, are there any more suggestions?? thx!

> > It seems that udpchannel is always writable
>
[quoted text clipped - 4 lines]
> unqueue the datagrams in the queue and try to send them all. If you
> succeed, unregister OP_WRITE again.
EJP - 20 Feb 2008 04:17 GMT
> Ok, I wrote the code like this:
> mykey=channel.register(selector, SelectionKey.OP_READ|SelectionKey.OP_WRITE);
Didn't you read what I posted at all?
> channel.send(x, new InetSocketAddress(ip, port));
Ditto.
> It seems the code works well, are there any more suggestions?? thx!
That code would still exhibit your original problem. Try what I suggested.
lightning - 20 Feb 2008 05:29 GMT
Of course I got what you say and the origianl problem had already
gone,cpu now costs nothing.
In fact you did not see my trick here ;)

look at the handle method:

> queue.clear();
> key.interestOps(SelectionKey.OP_READ);

You see,although I registered WRITE in the beginning,but when the
first WRITE fires,
the handle method can unregister WRITE after doing nothing.

So whatever I register at first does not matter and yes if I didn't
register OP_WRITE at start it would seem more clear.

the key is the code above in "handle" method and the following:
> if(!queue.isEmpty()){
>          mykey.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);
> }

> > Ok, I wrote the code like this:
> > mykey=channel.register(selector, SelectionKey.OP_READ|SelectionKey.OP_WRITE);
[quoted text clipped - 4 lines]
>
> That code would still exhibit your original problem. Try what I suggested.
EJP - 20 Feb 2008 09:46 GMT
So all you need to do now is implement the rest of what I said ...


Free Magazines

Get these publications absolutely FREE for up to 12 months. There are no hidden fees and no obligation. Simply choose a title, complete the application form and submit it. Read more ...

Oracle MagazineNetwork ComputingComputer WorldBio-IT WorldeWeekInformation WeekInfosecurity
 
Sign In
Join
My Latest Posts
My Monitored Threads
My Blog
My Photo Gallery
My Profile
My Homepage

Start New Thread
Enable EMail Alerts
Rate this Thread



©2008 Advenet LLC   Privacy Policy - Terms of Use
This website includes both content owned or controlled by Advenet as well as content owned or controlled by third parties.