User Tools

Site Tools


java:concurrentqueue

A concurrent version of a queue

This queue class is concurrent: it allows many threads to write in the queue or to read from the queue. If you are interested, I made a highly paralell program to test it.

package com.copytables.util;
 
public class ConcurrentQueue<T>
{
    private class ElementHolder<Q>
    {
        public Q data; 
        public ElementHolder<Q> next;
 
        @SuppressWarnings("unused")
        public ElementHolder()
        {
            this.data = null;
            this.next = null;
        }
       public ElementHolder( Q data )
        {
            this.data = data;
            this.next = null;
        }
        @SuppressWarnings("unused")
        public ElementHolder( Q data, ElementHolder<Q> next )
        {
            this.data = data;
            this.next = next;
        }
    }
 
    ElementHolder<T> base;
    ElementHolder<T> last;
    private int MAX_SIZE;
    private int numElements;
 
 
    // insertPosition points to the FIRST EMPTY
    // position
    // retrievePosition points to the LAST NOT EMPTY 
    // position 
    public ConcurrentQueue()
    {
        base = null;
        last = base;
        numElements = 0;
        MAX_SIZE = 10000;
    }
 
    public ConcurrentQueue( int bufferSize )
    {
        base = null;
        last = base;
        numElements = 0;
        MAX_SIZE = 10000;        
    }
 
    public synchronized void add( T elem )
    {
        // we wait until it's satisfied the condition 
        // that insertPosition < retrievePosition
        // (that there is margin to insert at least
        // one element)
        // (see:
        // http://docs.oracle.com/javase/tutorial/essential/concurrency/guardmeth.html
        // for details abaout this idiom)
        while( !hasFree() )
        {
            try
            {
                wait();
            }catch( InterruptedException e )
            {}
        }
 
        if( base == null && last == null )
        {
            base = new ElementHolder<T>( elem ); 
            last = base;
        }
        else
        {
            last.next = new ElementHolder<T>( elem );
            last = last.next;
        }
        numElements++;
        notifyAll();
    } // add
 
    // retrieve an element of the queue
    public synchronized T next( )
    {
        T out; 
 
        while( !hasNext() )
        {
            try
            {
                wait();
            }catch( InterruptedException e )
            {
                // do nothing; is for end the wait() operation
            }
        } // while
 
        // base != null 
        out = base.data; 
        if( base == last )
        {
            last = last.next;
        }
        base = base.next; 
        numElements--;
        notifyAll();
 
        return out;
    } // next 
 
 
    /**
     * return true when there is a next element in 
     * the queue, if there are no elements it waits
     * @return
     */
    public synchronized boolean waitForNext( int milliseconds )
    {
        if( !hasNext() )
        {
            try
            {
                wait( milliseconds );
            }catch( InterruptedException e )
            {
                // do nothing; is for end the wait() operation
            }
        } // !hasNext
        return hasNext();
    }
 
    /**
     * return true when there is a next element in 
     * the queue, if there are no elements it waits
     * forever
     * @return
     */
    public synchronized boolean waitForNext( )
    {
        if( !hasNext() )
        {
            try
            {
                wait( );
            }catch( InterruptedException e )
            {
                // do nothing; is for end the wait() operation
            }
        } // !hasNext
        return hasNext();
    }
 
 
    public synchronized boolean hasNext()
    {
        return base != null;
    } // hasNext
 
    public synchronized boolean hasFree()
    {
        return numElements < MAX_SIZE; 
    }
}
java/concurrentqueue.txt · Last modified: 2022/12/02 22:02 by 127.0.0.1