package actor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.Optional; public abstract class ActorThread extends Thread { private final LinkedBlockingQueue q = new LinkedBlockingQueue<>(); /** Called by another thread, to send a message to this thread. */ public void send(M message) { System.out.println(" # Sending message: " + message); q.offer(message); System.out.println(" % Queue State: " + q); } /** Returns the first message in the queue, or blocks if none available. */ protected M receive() throws InterruptedException { M mess = q.take(); System.out.println(" - Message taken: " + mess); return mess; } /** Returns the first message in the queue, or blocks up to 'timeout' milliseconds if none available. Returns null if no message is obtained within 'timeout' milliseconds. */ protected M receiveWithTimeout(long timeout) throws InterruptedException { return q.poll(timeout, TimeUnit.MILLISECONDS); } protected Optional poll(long timeOut) { try { Optional m = Optional.ofNullable(q.poll(timeOut, TimeUnit.MILLISECONDS)); System.out.println("Stolen: " + m); return m; } catch (InterruptedException e) { return Optional.empty(); } catch (Exception e) { return Optional.empty(); // Or exit the program } } protected void usleep(long time) { try { Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } } }