Home | Contact Us | FAQ | Search & Site Map | Link to Us
Sign In | Join | Other 45 Sites in Network
HomeAnnouncementsWhite Papers
Discussion GroupsFirst AidDatabasesJavaBeansGUIJava 3DVirtual MachineCORBASecurityToolsGeneral
Java DirectoryOpen Source ProjectsSample Book ChaptersUser GroupsWeb Resources
Related Topics
Databases.NETMore Topics ...

Java Forum / General / April 2007

Tip: Looking for answers? Try searching our database.

Fast Semaphore

Thread view: 
Joe Seigh - 06 Apr 2007 11:44 GMT
Here's a port of a fast pathed semaphore I did elsewhere.  It
only does single permit acquire and release.  If you use it
in conjunction with ConcurrentLinkedQueue you can get a blocking
queuue that's up to 3X faster than LinkedBlockingQueue under
contention.

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

public class FastSemaphore {
   
    AtomicInteger     count;        // semaphore count
    AtomicInteger    cancel;        // deferred cancelation count
    Semaphore        sem;        // slow semaphore

    public FastSemaphore(int z, boolean fair) {
        count = new AtomicInteger(z);
        cancel = new AtomicInteger(0);
        sem = new Semaphore(0, fair);
    }
   
    /*
    * processCancels - add cancelCount to current count
    *
    *   increment count by min(cancelCount, -(count)) iff count < 0
    */
    void processCancels(int cancelCount) {
        int    oldCount;
        int newCount;
       
        if (cancelCount > 0) {
            while ((oldCount = count.get()) < 0) {
                if ((newCount = oldCount + cancelCount) > 0)
                    newCount = 0;
               
                if (count.compareAndSet(oldCount, newCount)) {
                    cancelCount -= (newCount - oldCount);        // update cancelCount           
                    break;
                }               
            }
        }
       
        // add any untransferred cancelCount back into cancel
        if (cancelCount > 0) {
            cancel.addAndGet(cancelCount);
        }       
    }
   
    public void acquire()
    throws InterruptedException
    {
        if (count.addAndGet(-1) < 0) {
            try {
                sem.acquire();
            }
           
            catch (InterruptedException e) {
                // uncomment one and only one of the following 2 statements
                cancel.incrementAndGet();
                // processCancels(1);
                throw e;
            }
        }
    }
   
    public boolean tryAcquire(long timeout, TimeUnit unit)
    throws InterruptedException
    {
        boolean rc;
        if (count.addAndGet(-1) < 0) {
            try {
                rc = sem.tryAcquire(timeout, unit);               
            }
           
            catch (InterruptedException e) {
                // uncomment one and only one of the following 2 statements
                cancel.incrementAndGet();
                // processCancels(1);
                throw e;
            }
           
            if (rc == false) {
                cancel.incrementAndGet();               
                // processCancels(1);
            }
            return rc;
        }
       
        else
            return true;
    }
   
    public boolean tryAcquire() {
        int    oldCount;
       
        do {
            oldCount = count.get();
        }
        while (oldCount > 0 && !count.compareAndSet(oldCount, (oldCount - 1)));
       
        return (oldCount > 0);
       
    }
   
    public void release() {
        if (cancel.get() > 0 && count.get() < 0) {
            processCancels(cancel.getAndSet(0));
        }
       
        if (count.addAndGet(1) <= 0) {
            sem.release();
        }
    }
   
}

/*-*/

Signature

Joe Seigh

When you get lemons, you make lemonade.
When you get hardware, you make software.

neuneudr@yahoo.fr - 06 Apr 2007 19:47 GMT
> Here's a port of a fast pathed semaphore I did elsewhere.  It
> only does single permit acquire and release.

Hi,

you obviously know quite a lot about multi-threaded programming.
So I've got a question for you (I'm confused).

I saw that the Semaphore class got added to Java 1.5 by *the*
Java concurrency guru (Doug Lea).

On Wikipedia I find this:

"Semaphores remain in common use in programming languages that
"do not intrinsically support other forms of synchronization.

I've discovered multi-threaded programming with Java and
I try to do synchronization correctly (and I use the
java.util.concurrent package quite often).

So my question is simple: what do semaphores (and fast
semaphores) bring to Java that wasn't available before?

IOW, what can Java we do now in Java 1.5 that we couldn't
do in Java 1.4?

Is it "simply" efficiency (which of course is very important),
or are there other benefits that are also very important?

Is it possible to write correct multi-threaded Java programs
without using semaphores?

Thanks in advance for any infos,

 Driss
Joe Seigh - 07 Apr 2007 01:34 GMT
>>Here's a port of a fast pathed semaphore I did elsewhere.  It
>>only does single permit acquire and release.

[...]
> So my question is simple: what do semaphores (and fast
> semaphores) bring to Java that wasn't available before?

Semaphores are just another synchronization primative.  They're
not as widely applicable as locks and condition variables but
when you do run into a situation where they're useful, they're
nice to have.  Semaphores are useful for limiting the number of
threads accessing a resource.  They work well for producer/consumer
sometimes although they don't address thread safety of the queue
itself.  Other synchronization primatives are barriers (already in
1.5) and eventcounts which aren't in Java.

> IOW, what can Java we do now in Java 1.5 that we couldn't
> do in Java 1.4?

Nothing, but it's more of a question of efficiency and being
able to use more "natural" design patterns associated with the
synchronization primative in question.

> Is it "simply" efficiency (which of course is very important),
> or are there other benefits that are also very important?
>
> Is it possible to write correct multi-threaded Java programs
> without using semaphores?

It better be. :)

> Thanks in advance for any infos,

Signature

Joe Seigh

When you get lemons, you make lemonade.
When you get hardware, you make software.

Daniel Pitts - 06 Apr 2007 23:04 GMT
> Here's a port of a fast pathed semaphore I did elsewhere.  It
> only does single permit acquire and release.  If you use it
> in conjunction with ConcurrentLinkedQueue you can get a blocking
> queuue that's up to 3X faster than LinkedBlockingQueue under
> contention.

What benchmarks did you use? Code?
Robert Klemme - 07 Apr 2007 13:39 GMT
>> Here's a port of a fast pathed semaphore I did elsewhere.  It
>> only does single permit acquire and release.  If you use it
[quoted text clipped - 3 lines]
>>
> What benchmarks did you use? Code?

I'm curious, too, how it compares to j.u.c.Semaphore.

Kind regards

    robert
Joe Seigh - 07 Apr 2007 13:49 GMT
>>> Here's a port of a fast pathed semaphore I did elsewhere.  It
>>> only does single permit acquire and release.  If you use it
[quoted text clipped - 5 lines]
>
> I'm curious, too, how it compares to j.u.c.Semaphore.

I'm rewriting the testcase to make it a bit more compact.  It was a bit
large for posting.

The contention is set artifically* high, a bunch of threads doing queue
operations on a queue that's non empty most of the time.

* for normal usage unless you were doing a high throuput server or something
like that.

Signature

Joe Seigh

When you get lemons, you make lemonade.
When you get hardware, you make software.

Joe Seigh - 07 Apr 2007 15:20 GMT
>>> Here's a port of a fast pathed semaphore I did elsewhere.  It
>>> only does single permit acquire and release.  If you use it
[quoted text clipped - 5 lines]
>
> I'm curious, too, how it compares to j.u.c.Semaphore.

Ok.  I have a new design pattern that subsets interfaces so
I can compare apple to orange implementations without having
to implement the fruit, plant, multi-celled organism, cell,
dna, organic molecule, molecule, and atom interfaces for
orange.  Java's architects really have nothing better to do
with other people's time.

import java.util.LinkedList;
import java.util.concurrent.*;
import java.util.*;

interface Sem {
    public void acquire() throws InterruptedException;
    public void release();
}

class NSem extends Semaphore implements Sem {
    public NSem(int n, boolean fair) {super(n, fair); }
}

class FSem extends FastSemaphore implements Sem {
    public FSem(int n, boolean fair) { super(n, fair); }
}

interface fifo<T> {
    public void queue(T o);
    public T dequeue() throws InterruptedException;
}

class ConcurrentFifoQueue<T>
    implements fifo<T>
{
    ConcurrentLinkedQueue<T>    queue;
    Sem                            sem;
   
    ConcurrentFifoQueue(Sem s) {
        queue = new ConcurrentLinkedQueue<T>();
        sem = s;
    }
    public void queue(T o) {
        queue.offer(o);
        sem.release();
    }
    public T dequeue() throws InterruptedException {
        sem.acquire();
        return queue.poll();
    }
}

class BlockingFifoQueue<T>
extends java.util.concurrent.LinkedBlockingQueue<T>
implements fifo<T>
{
    public void queue(T o) { try {put(o); } catch (InterruptedException e) {} }
    public T dequeue() throws InterruptedException { return take(); }
}

public class qtest {

    /**
    * multi-threaded queueing test
    *
    */
    final static int loopcount = 20000;
    final static int nodecount = 200;
    final static int threadcount = 20;
    final static Formatter fmt = new java.util.Formatter(System.out);
   
    public void test(final fifo<Object> fullq, final fifo<Object> emptyq, String desc) {
        int    j;
        long t0, t1;                // start, stop times
        Thread producer[] = new Thread[threadcount];
        Thread consumer[] = new Thread[threadcount];
        final CyclicBarrier barrier = new CyclicBarrier(producer.length + consumer.length + 1);
       
        for (j = 0; j < nodecount; j++) {
            emptyq.queue(new Object());
        }
   
        for (j = 0; j < producer.length; j++) {
            producer[j] = new Thread(new Runnable() {
                public void run() {
                    try {barrier.await(); } catch (Exception e)    {}
                    for (int j = 0; j < loopcount; j++) {
                        try {fullq.queue(emptyq.dequeue()); } catch (InterruptedException e) {}
                    }                   
                    try {barrier.await(); } catch (Exception e)    {}
                }
            });
            producer[j].setDaemon(true);
            producer[j].start();
        }
       
        for (j = 0; j < consumer.length; j++) {
            consumer[j] = new Thread(new Runnable() {
                public void run() {           
                    try {barrier.await(); } catch (Exception e)    {}
                    for (int j = 0; j < loopcount; j++) {
                        try {emptyq.queue(fullq.dequeue()); } catch (InterruptedException e) {}
                    }
                    try {barrier.await(); } catch (Exception e)    {}
                }
            });
            consumer[j].setDaemon(true);
            consumer[j].start();
        }
       
        try {barrier.await(); } catch (Exception e)    {}
        t0 = System.nanoTime();

        try {barrier.await(); } catch (Exception e)    {}
        t1 = System.nanoTime();
        double x = ((t1 - t0)/1e9);
        System.out.println(desc + ":");
        fmt.format("runtime        = %12.9f secs\n\n", x);       
    }
   
   
       
    public static void main(String[] args) {
       
       
        qtest q = new qtest();
       
        fmt.format("loop count     = %6d\n", loopcount);
        fmt.format("queue size     = %6d\n", nodecount);
        fmt.format("producer count = %6d\n", threadcount);
        fmt.format("consumer count = %6d\n\n", threadcount);

       
        q.test(
                new BlockingFifoQueue<Object>(),
                new BlockingFifoQueue<Object>(),
                "LinkedBlockingQueue");
       
        q.test(
                new ConcurrentFifoQueue<Object>(new NSem(0, false)),
                new ConcurrentFifoQueue<Object>(new NSem(0, false)),
                "ConcurrentLinkedQueue w/ unfair semaphore");
       
        q.test(
                new ConcurrentFifoQueue<Object>(new NSem(0, true)),
                new ConcurrentFifoQueue<Object>(new NSem(0, true)),
                "ConcurrentLinkedQueue w/ fair semaphore");
       
        q.test(
                new ConcurrentFifoQueue<Object>(new FSem(0, false)),
                new ConcurrentFifoQueue<Object>(new FSem(0, false)),
                "ConcurrentLinkedQueue w/ unfair fast semaphore");
       
        q.test(
                new ConcurrentFifoQueue<Object>(new FSem(0, true)),
                new ConcurrentFifoQueue<Object>(new FSem(0, true)),
                "ConcurrentLinkedQueue w/ fair fast semaphore");
       
        System.out.println("qtest exiting...");

    }

}

Signature

Joe Seigh

When you get lemons, you make lemonade.
When you get hardware, you make software.



Free Magazines

Get these publications absolutely FREE for up to 12 months. There are no hidden fees and no obligation. Simply choose a title, complete the application form and submit it. Read more ...

Oracle MagazineNetwork ComputingComputer WorldBio-IT WorldeWeekInformation WeekInfosecurity
 
Sign In
Join
My Latest Posts
My Monitored Threads
My Blog
My Photo Gallery
My Profile
My Homepage

Start New Thread
Enable EMail Alerts
Rate this Thread



©2008 Advenet LLC   Privacy Policy - Terms of Use
This website includes both content owned or controlled by Advenet as well as content owned or controlled by third parties.