java:concurrentqueue
Differences
This shows you the differences between two versions of the page.
| Both sides previous revisionPrevious revisionNext revision | Previous revision | ||
| java:concurrentqueue [2012/05/30 16:19] – rlunaro | java:concurrentqueue [2022/12/02 21:02] (current) – external edit 127.0.0.1 | ||
|---|---|---|---|
| Line 1: | Line 1: | ||
| + | ====== A concurrent version of a queue ====== | ||
| + | |||
| + | |||
| + | This queue class is concurrent: it allows many threads to write in the queue or to read from the queue. If you are interested, [[queueTester|I made a highly paralell program to test it]]. | ||
| + | |||
| + | <code java> | ||
| + | package com.copytables.util; | ||
| + | |||
| + | public class ConcurrentQueue< | ||
| + | { | ||
| + | private class ElementHolder< | ||
| + | { | ||
| + | public Q data; | ||
| + | public ElementHolder< | ||
| + | | ||
| + | @SuppressWarnings(" | ||
| + | public ElementHolder() | ||
| + | { | ||
| + | this.data = null; | ||
| + | this.next = null; | ||
| + | } | ||
| + | | ||
| + | { | ||
| + | this.data = data; | ||
| + | this.next = null; | ||
| + | } | ||
| + | @SuppressWarnings(" | ||
| + | public ElementHolder( Q data, ElementHolder< | ||
| + | { | ||
| + | this.data = data; | ||
| + | this.next = next; | ||
| + | } | ||
| + | } | ||
| + | | ||
| + | ElementHolder< | ||
| + | ElementHolder< | ||
| + | private int MAX_SIZE; | ||
| + | private int numElements; | ||
| + | | ||
| + | | ||
| + | // insertPosition points to the FIRST EMPTY | ||
| + | // position | ||
| + | // retrievePosition points to the LAST NOT EMPTY | ||
| + | // position | ||
| + | public ConcurrentQueue() | ||
| + | { | ||
| + | base = null; | ||
| + | last = base; | ||
| + | numElements = 0; | ||
| + | MAX_SIZE = 10000; | ||
| + | } | ||
| + | | ||
| + | public ConcurrentQueue( int bufferSize ) | ||
| + | { | ||
| + | base = null; | ||
| + | last = base; | ||
| + | numElements = 0; | ||
| + | MAX_SIZE = 10000; | ||
| + | } | ||
| + | | ||
| + | public synchronized void add( T elem ) | ||
| + | { | ||
| + | // we wait until it's satisfied the condition | ||
| + | // that insertPosition < retrievePosition | ||
| + | // (that there is margin to insert at least | ||
| + | // one element) | ||
| + | // (see: | ||
| + | // http:// | ||
| + | // for details abaout this idiom) | ||
| + | while( !hasFree() ) | ||
| + | { | ||
| + | try | ||
| + | { | ||
| + | wait(); | ||
| + | }catch( InterruptedException e ) | ||
| + | {} | ||
| + | } | ||
| + | | ||
| + | if( base == null && last == null ) | ||
| + | { | ||
| + | base = new ElementHolder< | ||
| + | last = base; | ||
| + | } | ||
| + | else | ||
| + | { | ||
| + | last.next = new ElementHolder< | ||
| + | last = last.next; | ||
| + | } | ||
| + | numElements++; | ||
| + | notifyAll(); | ||
| + | } // add | ||
| + | | ||
| + | // retrieve an element of the queue | ||
| + | public synchronized T next( ) | ||
| + | { | ||
| + | T out; | ||
| + | | ||
| + | while( !hasNext() ) | ||
| + | { | ||
| + | try | ||
| + | { | ||
| + | wait(); | ||
| + | }catch( InterruptedException e ) | ||
| + | { | ||
| + | // do nothing; is for end the wait() operation | ||
| + | } | ||
| + | } // while | ||
| + | |||
| + | // base != null | ||
| + | out = base.data; | ||
| + | if( base == last ) | ||
| + | { | ||
| + | last = last.next; | ||
| + | } | ||
| + | base = base.next; | ||
| + | numElements--; | ||
| + | notifyAll(); | ||
| + | | ||
| + | return out; | ||
| + | } // next | ||
| + | | ||
| + | | ||
| + | /** | ||
| + | * return true when there is a next element in | ||
| + | * the queue, if there are no elements it waits | ||
| + | * @return | ||
| + | */ | ||
| + | public synchronized boolean waitForNext( int milliseconds ) | ||
| + | { | ||
| + | if( !hasNext() ) | ||
| + | { | ||
| + | try | ||
| + | { | ||
| + | wait( milliseconds ); | ||
| + | }catch( InterruptedException e ) | ||
| + | { | ||
| + | // do nothing; is for end the wait() operation | ||
| + | } | ||
| + | } // !hasNext | ||
| + | return hasNext(); | ||
| + | } | ||
| + | |||
| + | /** | ||
| + | * return true when there is a next element in | ||
| + | * the queue, if there are no elements it waits | ||
| + | * forever | ||
| + | * @return | ||
| + | */ | ||
| + | public synchronized boolean waitForNext( ) | ||
| + | { | ||
| + | if( !hasNext() ) | ||
| + | { | ||
| + | try | ||
| + | { | ||
| + | wait( ); | ||
| + | }catch( InterruptedException e ) | ||
| + | { | ||
| + | // do nothing; is for end the wait() operation | ||
| + | } | ||
| + | } // !hasNext | ||
| + | return hasNext(); | ||
| + | } | ||
| + | | ||
| + | | ||
| + | public synchronized boolean hasNext() | ||
| + | { | ||
| + | return base != null; | ||
| + | } // hasNext | ||
| + | | ||
| + | public synchronized boolean hasFree() | ||
| + | { | ||
| + | return numElements < MAX_SIZE; | ||
| + | } | ||
| + | } | ||
| + | |||
| + | </ | ||
| + | |||
