User Tools

Site Tools


java:concurrentqueue

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Next revision
Previous revision
java:concurrentqueue [2012/05/30 18:18] – creado rlunarojava:concurrentqueue [2022/12/02 22:02] (current) – external edit 127.0.0.1
Line 1: Line 1:
 +====== A concurrent version of a queue ======
 +
 +
 +This queue class is concurrent: it allows many threads to write in the queue or to read from the queue. If you are interested, [[queueTester|I made a highly paralell program to test it]].
 +
 +<code java>
 +package com.copytables.util;
 +
 +public class ConcurrentQueue<T>
 +{
 +    private class ElementHolder<Q>
 +    {
 +        public Q data; 
 +        public ElementHolder<Q> next;
 +        
 +        @SuppressWarnings("unused")
 +        public ElementHolder()
 +        {
 +            this.data = null;
 +            this.next = null;
 +        }
 +       public ElementHolder( Q data )
 +        {
 +            this.data = data;
 +            this.next = null;
 +        }
 +        @SuppressWarnings("unused")
 +        public ElementHolder( Q data, ElementHolder<Q> next )
 +        {
 +            this.data = data;
 +            this.next = next;
 +        }
 +    }
 +    
 +    ElementHolder<T> base;
 +    ElementHolder<T> last;
 +    private int MAX_SIZE;
 +    private int numElements;
 +    
 +    
 +    // insertPosition points to the FIRST EMPTY
 +    // position
 +    // retrievePosition points to the LAST NOT EMPTY 
 +    // position 
 +    public ConcurrentQueue()
 +    {
 +        base = null;
 +        last = base;
 +        numElements = 0;
 +        MAX_SIZE = 10000;
 +    }
 +    
 +    public ConcurrentQueue( int bufferSize )
 +    {
 +        base = null;
 +        last = base;
 +        numElements = 0;
 +        MAX_SIZE = 10000;        
 +    }
 +    
 +    public synchronized void add( T elem )
 +    {
 +        // we wait until it's satisfied the condition 
 +        // that insertPosition < retrievePosition
 +        // (that there is margin to insert at least
 +        // one element)
 +        // (see:
 +        // http://docs.oracle.com/javase/tutorial/essential/concurrency/guardmeth.html
 +        // for details abaout this idiom)
 +        while( !hasFree() )
 +        {
 +            try
 +            {
 +                wait();
 +            }catch( InterruptedException e )
 +            {}
 +        }
 +        
 +        if( base == null && last == null )
 +        {
 +            base = new ElementHolder<T>( elem ); 
 +            last = base;
 +        }
 +        else
 +        {
 +            last.next = new ElementHolder<T>( elem );
 +            last = last.next;
 +        }
 +        numElements++;
 +        notifyAll();
 +    } // add
 +    
 +    // retrieve an element of the queue
 +    public synchronized T next( )
 +    {
 +        T out; 
 +        
 +        while( !hasNext() )
 +        {
 +            try
 +            {
 +                wait();
 +            }catch( InterruptedException e )
 +            {
 +                // do nothing; is for end the wait() operation
 +            }
 +        } // while
 +
 +        // base != null 
 +        out = base.data; 
 +        if( base == last )
 +        {
 +            last = last.next;
 +        }
 +        base = base.next; 
 +        numElements--;
 +        notifyAll();
 +        
 +        return out;
 +    } // next 
 +    
 +    
 +    /**
 +     * return true when there is a next element in 
 +     * the queue, if there are no elements it waits
 +     * @return
 +     */
 +    public synchronized boolean waitForNext( int milliseconds )
 +    {
 +        if( !hasNext() )
 +        {
 +            try
 +            {
 +                wait( milliseconds );
 +            }catch( InterruptedException e )
 +            {
 +                // do nothing; is for end the wait() operation
 +            }
 +        } // !hasNext
 +        return hasNext();
 +    }
 +
 +    /**
 +     * return true when there is a next element in 
 +     * the queue, if there are no elements it waits
 +     * forever
 +     * @return
 +     */
 +    public synchronized boolean waitForNext( )
 +    {
 +        if( !hasNext() )
 +        {
 +            try
 +            {
 +                wait( );
 +            }catch( InterruptedException e )
 +            {
 +                // do nothing; is for end the wait() operation
 +            }
 +        } // !hasNext
 +        return hasNext();
 +    }
 +    
 +    
 +    public synchronized boolean hasNext()
 +    {
 +        return base != null;
 +    } // hasNext
 +    
 +    public synchronized boolean hasFree()
 +    {
 +        return numElements < MAX_SIZE; 
 +    }
 +}
 +
 +</code>
 +