====== A concurrent version of a queue ====== This queue class is concurrent: it allows many threads to write in the queue or to read from the queue. If you are interested, [[queueTester|I made a highly paralell program to test it]]. package com.copytables.util; public class ConcurrentQueue { private class ElementHolder { public Q data; public ElementHolder next; @SuppressWarnings("unused") public ElementHolder() { this.data = null; this.next = null; } public ElementHolder( Q data ) { this.data = data; this.next = null; } @SuppressWarnings("unused") public ElementHolder( Q data, ElementHolder next ) { this.data = data; this.next = next; } } ElementHolder base; ElementHolder last; private int MAX_SIZE; private int numElements; // insertPosition points to the FIRST EMPTY // position // retrievePosition points to the LAST NOT EMPTY // position public ConcurrentQueue() { base = null; last = base; numElements = 0; MAX_SIZE = 10000; } public ConcurrentQueue( int bufferSize ) { base = null; last = base; numElements = 0; MAX_SIZE = 10000; } public synchronized void add( T elem ) { // we wait until it's satisfied the condition // that insertPosition < retrievePosition // (that there is margin to insert at least // one element) // (see: // http://docs.oracle.com/javase/tutorial/essential/concurrency/guardmeth.html // for details abaout this idiom) while( !hasFree() ) { try { wait(); }catch( InterruptedException e ) {} } if( base == null && last == null ) { base = new ElementHolder( elem ); last = base; } else { last.next = new ElementHolder( elem ); last = last.next; } numElements++; notifyAll(); } // add // retrieve an element of the queue public synchronized T next( ) { T out; while( !hasNext() ) { try { wait(); }catch( InterruptedException e ) { // do nothing; is for end the wait() operation } } // while // base != null out = base.data; if( base == last ) { last = last.next; } base = base.next; numElements--; notifyAll(); return out; } // next /** * return true when there is a next element in * the queue, if there are no elements it waits * @return */ public synchronized boolean waitForNext( int milliseconds ) { if( !hasNext() ) { try { wait( milliseconds ); }catch( InterruptedException e ) { // do nothing; is for end the wait() operation } } // !hasNext return hasNext(); } /** * return true when there is a next element in * the queue, if there are no elements it waits * forever * @return */ public synchronized boolean waitForNext( ) { if( !hasNext() ) { try { wait( ); }catch( InterruptedException e ) { // do nothing; is for end the wait() operation } } // !hasNext return hasNext(); } public synchronized boolean hasNext() { return base != null; } // hasNext public synchronized boolean hasFree() { return numElements < MAX_SIZE; } }