Adds semaphores.

This commit is contained in:
Tanmay 2022-08-24 13:57:51 +01:00
parent c7437d90bc
commit 7dbf8cb992
3 changed files with 35 additions and 12 deletions

View File

@ -1,6 +1,7 @@
package com.scaler.producerconsumer; package com.scaler.producerconsumer;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.Semaphore;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Getter; import lombok.Getter;
@ -11,16 +12,25 @@ public class Consumer implements Runnable {
private Queue<UnitOfWork> store; private Queue<UnitOfWork> store;
private String name; private String name;
private Semaphore forProducer;
private Semaphore forConsumer;
@Override @Override
public void run() { public void run() {
while (true) { while (true) {
synchronized (store) { try {
if (store.size() > 0) { forConsumer.acquire();
store.remove(); } catch (InterruptedException e) {
System.out.println("Consumed: " + name + " Left units :" + store.size()); throw new RuntimeException("Error acquiring semaphore " + e);
}
} }
if (store.size() > 0) {
store.remove();
System.out.println("Consumed: " + name + " Left units :" + store.size());
}
forProducer.release();
} }
} }

View File

@ -1,6 +1,7 @@
package com.scaler.producerconsumer; package com.scaler.producerconsumer;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.Semaphore;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Getter; import lombok.Getter;
@ -12,16 +13,24 @@ public class Producer implements Runnable {
private Queue<UnitOfWork> store; private Queue<UnitOfWork> store;
private int maxSize; private int maxSize;
private String name; private String name;
private Semaphore forProducer;
private Semaphore forConsumer;
@Override @Override
public void run() { public void run() {
while (true) { while (true) {
synchronized (store) { try {
if (store.size() < maxSize) { forProducer.acquire();
store.add(new UnitOfWork()); } catch (InterruptedException e) {
System.out.println("Produced: " + name + " Left units :" + store.size()); throw new RuntimeException("Error acquiring semaphore " + e);
}
} }
if (store.size() < maxSize) {
store.add(new UnitOfWork());
System.out.println("Produced: " + name + " Left units :" + store.size());
}
forConsumer.release();
} }
} }

View File

@ -3,6 +3,7 @@ package com.scaler.producerconsumer;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.Queue; import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class Runner { public class Runner {
@ -14,14 +15,17 @@ public class Runner {
Queue<UnitOfWork> store = new ArrayDeque<>(); Queue<UnitOfWork> store = new ArrayDeque<>();
int maxSize = 20; int maxSize = 20;
Semaphore forProducer = new Semaphore(maxSize);
Semaphore forConsumer = new Semaphore(0);
Set<Producer> producers = producerNames Set<Producer> producers = producerNames
.stream() .stream()
.map(name -> new Producer(store, maxSize, name)) .map(name -> new Producer(store, maxSize, name, forProducer, forConsumer))
.collect(Collectors.toSet()); .collect(Collectors.toSet());
Set<Consumer> consumers = consumerNames Set<Consumer> consumers = consumerNames
.stream() .stream()
.map(name -> new Consumer(store, name)) .map(name -> new Consumer(store, name, forProducer, forConsumer))
.collect(Collectors.toSet()); .collect(Collectors.toSet());
producers.forEach(producer -> new Thread(producer).start()); producers.forEach(producer -> new Thread(producer).start());