diff --git a/os/code/os/src/main/java/com/scaler/producerconsumer/Consumer.java b/os/code/os/src/main/java/com/scaler/producerconsumer/Consumer.java index 4822c7a..fb07181 100644 --- a/os/code/os/src/main/java/com/scaler/producerconsumer/Consumer.java +++ b/os/code/os/src/main/java/com/scaler/producerconsumer/Consumer.java @@ -1,6 +1,7 @@ package com.scaler.producerconsumer; import java.util.Queue; +import java.util.concurrent.Semaphore; import lombok.AllArgsConstructor; import lombok.Getter; @@ -11,16 +12,25 @@ public class Consumer implements Runnable { private Queue store; private String name; + private Semaphore forProducer; + private Semaphore forConsumer; @Override public void run() { while (true) { - synchronized (store) { - if (store.size() > 0) { - store.remove(); - System.out.println("Consumed: " + name + " Left units :" + store.size()); - } + try { + forConsumer.acquire(); + } catch (InterruptedException e) { + throw new RuntimeException("Error acquiring semaphore " + e); } + + if (store.size() > 0) { + store.remove(); + System.out.println("Consumed: " + name + " Left units :" + store.size()); + } + + forProducer.release(); + } } diff --git a/os/code/os/src/main/java/com/scaler/producerconsumer/Producer.java b/os/code/os/src/main/java/com/scaler/producerconsumer/Producer.java index b360808..b438860 100644 --- a/os/code/os/src/main/java/com/scaler/producerconsumer/Producer.java +++ b/os/code/os/src/main/java/com/scaler/producerconsumer/Producer.java @@ -1,6 +1,7 @@ package com.scaler.producerconsumer; import java.util.Queue; +import java.util.concurrent.Semaphore; import lombok.AllArgsConstructor; import lombok.Getter; @@ -12,16 +13,24 @@ public class Producer implements Runnable { private Queue store; private int maxSize; private String name; + private Semaphore forProducer; + private Semaphore forConsumer; @Override public void run() { while (true) { - synchronized (store) { - if (store.size() < maxSize) { - store.add(new UnitOfWork()); - System.out.println("Produced: " + name + " Left units :" + store.size()); - } + try { + forProducer.acquire(); + } catch (InterruptedException e) { + throw new RuntimeException("Error acquiring semaphore " + e); } + + if (store.size() < maxSize) { + store.add(new UnitOfWork()); + System.out.println("Produced: " + name + " Left units :" + store.size()); + } + + forConsumer.release(); } } diff --git a/os/code/os/src/main/java/com/scaler/producerconsumer/Runner.java b/os/code/os/src/main/java/com/scaler/producerconsumer/Runner.java index 78ee9db..75b5cb7 100644 --- a/os/code/os/src/main/java/com/scaler/producerconsumer/Runner.java +++ b/os/code/os/src/main/java/com/scaler/producerconsumer/Runner.java @@ -3,6 +3,7 @@ package com.scaler.producerconsumer; import java.util.ArrayDeque; import java.util.Queue; import java.util.Set; +import java.util.concurrent.Semaphore; import java.util.stream.Collectors; public class Runner { @@ -14,14 +15,17 @@ public class Runner { Queue store = new ArrayDeque<>(); int maxSize = 20; + Semaphore forProducer = new Semaphore(maxSize); + Semaphore forConsumer = new Semaphore(0); + Set producers = producerNames .stream() - .map(name -> new Producer(store, maxSize, name)) + .map(name -> new Producer(store, maxSize, name, forProducer, forConsumer)) .collect(Collectors.toSet()); Set consumers = consumerNames .stream() - .map(name -> new Consumer(store, name)) + .map(name -> new Consumer(store, name, forProducer, forConsumer)) .collect(Collectors.toSet()); producers.forEach(producer -> new Thread(producer).start());