java:concurrentqueue
A concurrent version of a queue
This queue class is concurrent: it allows many threads to write in the queue or to read from the queue. If you are interested, I made a highly paralell program to test it.
package com.copytables.util; public class ConcurrentQueue<T> { private class ElementHolder<Q> { public Q data; public ElementHolder<Q> next; @SuppressWarnings("unused") public ElementHolder() { this.data = null; this.next = null; } public ElementHolder( Q data ) { this.data = data; this.next = null; } @SuppressWarnings("unused") public ElementHolder( Q data, ElementHolder<Q> next ) { this.data = data; this.next = next; } } ElementHolder<T> base; ElementHolder<T> last; private int MAX_SIZE; private int numElements; // insertPosition points to the FIRST EMPTY // position // retrievePosition points to the LAST NOT EMPTY // position public ConcurrentQueue() { base = null; last = base; numElements = 0; MAX_SIZE = 10000; } public ConcurrentQueue( int bufferSize ) { base = null; last = base; numElements = 0; MAX_SIZE = 10000; } public synchronized void add( T elem ) { // we wait until it's satisfied the condition // that insertPosition < retrievePosition // (that there is margin to insert at least // one element) // (see: // http://docs.oracle.com/javase/tutorial/essential/concurrency/guardmeth.html // for details abaout this idiom) while( !hasFree() ) { try { wait(); }catch( InterruptedException e ) {} } if( base == null && last == null ) { base = new ElementHolder<T>( elem ); last = base; } else { last.next = new ElementHolder<T>( elem ); last = last.next; } numElements++; notifyAll(); } // add // retrieve an element of the queue public synchronized T next( ) { T out; while( !hasNext() ) { try { wait(); }catch( InterruptedException e ) { // do nothing; is for end the wait() operation } } // while // base != null out = base.data; if( base == last ) { last = last.next; } base = base.next; numElements--; notifyAll(); return out; } // next /** * return true when there is a next element in * the queue, if there are no elements it waits * @return */ public synchronized boolean waitForNext( int milliseconds ) { if( !hasNext() ) { try { wait( milliseconds ); }catch( InterruptedException e ) { // do nothing; is for end the wait() operation } } // !hasNext return hasNext(); } /** * return true when there is a next element in * the queue, if there are no elements it waits * forever * @return */ public synchronized boolean waitForNext( ) { if( !hasNext() ) { try { wait( ); }catch( InterruptedException e ) { // do nothing; is for end the wait() operation } } // !hasNext return hasNext(); } public synchronized boolean hasNext() { return base != null; } // hasNext public synchronized boolean hasFree() { return numElements < MAX_SIZE; } }
java/concurrentqueue.txt · Last modified: 2022/12/02 21:02 by 127.0.0.1