commit 0d2fabc9629bd989e6d7bb8f921a25a859f4e0da Author: Your Name Date: Wed Jul 15 18:02:38 2020 +0530 Added LLD projects diff --git a/distributed-cache/.idea/.gitignore b/distributed-cache/.idea/.gitignore new file mode 100644 index 0000000..26d3352 --- /dev/null +++ b/distributed-cache/.idea/.gitignore @@ -0,0 +1,3 @@ +# Default ignored files +/shelf/ +/workspace.xml diff --git a/distributed-cache/.idea/compiler.xml b/distributed-cache/.idea/compiler.xml new file mode 100644 index 0000000..bddbcc6 --- /dev/null +++ b/distributed-cache/.idea/compiler.xml @@ -0,0 +1,13 @@ + + + + + + + + + + + + + \ No newline at end of file diff --git a/distributed-cache/.idea/jarRepositories.xml b/distributed-cache/.idea/jarRepositories.xml new file mode 100644 index 0000000..712ab9d --- /dev/null +++ b/distributed-cache/.idea/jarRepositories.xml @@ -0,0 +1,20 @@ + + + + + + + + + + + \ No newline at end of file diff --git a/distributed-cache/.idea/misc.xml b/distributed-cache/.idea/misc.xml new file mode 100644 index 0000000..d24ea8e --- /dev/null +++ b/distributed-cache/.idea/misc.xml @@ -0,0 +1,14 @@ + + + + + + + + + + \ No newline at end of file diff --git a/distributed-cache/.idea/uiDesigner.xml b/distributed-cache/.idea/uiDesigner.xml new file mode 100644 index 0000000..e96534f --- /dev/null +++ b/distributed-cache/.idea/uiDesigner.xml @@ -0,0 +1,124 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/distributed-cache/.idea/vcs.xml b/distributed-cache/.idea/vcs.xml new file mode 100644 index 0000000..6c0b863 --- /dev/null +++ b/distributed-cache/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/distributed-cache/DistributedCache.iml b/distributed-cache/DistributedCache.iml new file mode 100644 index 0000000..78b2cc5 --- /dev/null +++ b/distributed-cache/DistributedCache.iml @@ -0,0 +1,2 @@ + + \ No newline at end of file diff --git a/distributed-cache/Requirements.ME b/distributed-cache/Requirements.ME new file mode 100644 index 0000000..b64fdad --- /dev/null +++ b/distributed-cache/Requirements.ME @@ -0,0 +1,9 @@ +Should allow: + 1) listeners on load and evict + 2) hot loading elements on startup + 3) multiple eviction algorithms like LRU and LFU + 4) expiration time + 5) multiple fetch algorithms like write back and write through + 6) return futures + 7) request collapsing + 8) Avoid thrashing with rate limiting \ No newline at end of file diff --git a/distributed-cache/pom.xml b/distributed-cache/pom.xml new file mode 100644 index 0000000..86533e0 --- /dev/null +++ b/distributed-cache/pom.xml @@ -0,0 +1,32 @@ + + + 4.0.0 + + interviewready.io + distributed-cache + 1.0 + + + + org.apache.maven.plugins + maven-compiler-plugin + + 11 + 11 + + + + + + + + junit + junit + 4.13 + test + + + + \ No newline at end of file diff --git a/distributed-cache/src/main/java/Cache.java b/distributed-cache/src/main/java/Cache.java new file mode 100644 index 0000000..167aa5f --- /dev/null +++ b/distributed-cache/src/main/java/Cache.java @@ -0,0 +1,179 @@ +import events.*; +import models.*; + +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.*; +import java.util.function.Function; + +public class Cache { + private final int maximumSize; + private final FetchAlgorithm fetchAlgorithm; + private final Duration expiryTime; + private final Map>> cache; + private final ConcurrentSkipListMap> priorityQueue; + private final ConcurrentSkipListMap> expiryQueue; + private final DataSource dataSource; + private final List> eventQueue; + private final ExecutorService[] executorPool; + private final Timer timer; + + protected Cache(final int maximumSize, + final Duration expiryTime, + final FetchAlgorithm fetchAlgorithm, + final EvictionAlgorithm evictionAlgorithm, + final DataSource dataSource, + final Set keysToEagerlyLoad, + final Timer timer, + final int poolSize) { + this.expiryTime = expiryTime; + this.maximumSize = maximumSize; + this.fetchAlgorithm = fetchAlgorithm; + this.timer = timer; + this.cache = new ConcurrentHashMap<>(); + this.eventQueue = new CopyOnWriteArrayList<>(); + this.dataSource = dataSource; + this.executorPool = new ExecutorService[poolSize]; + for (int i = 0; i < poolSize; i++) { + executorPool[i] = Executors.newSingleThreadExecutor(); + } + priorityQueue = new ConcurrentSkipListMap<>((first, second) -> { + final var accessTimeDifference = (int) (first.getLastAccessTime() - second.getLastAccessTime()); + if (evictionAlgorithm.equals(EvictionAlgorithm.LRU)) { + return accessTimeDifference; + } else { + final var accessCountDifference = first.getAccessCount() - second.getAccessCount(); + return accessCountDifference != 0 ? accessCountDifference : accessTimeDifference; + } + }); + expiryQueue = new ConcurrentSkipListMap<>(); + final var eagerLoading = keysToEagerlyLoad.stream() + .map(key -> getThreadFor(key, addToCache(key, loadFromDB(dataSource, key)))) + .toArray(CompletableFuture[]::new); + CompletableFuture.allOf(eagerLoading).join(); + } + + private CompletionStage getThreadFor(KEY key, CompletionStage task) { + return CompletableFuture.supplyAsync(() -> task, executorPool[Math.abs(key.hashCode() % executorPool.length)]).thenCompose(Function.identity()); + } + + public CompletionStage get(KEY key) { + return getThreadFor(key, getFromCache(key)); + } + + public CompletionStage set(KEY key, VALUE value) { + return getThreadFor(key, setInCache(key, value)); + } + + private CompletionStage getFromCache(KEY key) { + final CompletionStage> result; + if (!cache.containsKey(key)) { + result = addToCache(key, loadFromDB(dataSource, key)); + } else { + result = cache.get(key).thenCompose(record -> { + if (hasExpired(record)) { + priorityQueue.get(record.getAccessDetails()).remove(key); + expiryQueue.get(record.getInsertionTime()).remove(key); + eventQueue.add(new Eviction<>(record, Eviction.Type.EXPIRY, timer.getCurrentTime())); + return addToCache(key, loadFromDB(dataSource, key)); + } else { + return CompletableFuture.completedFuture(record); + } + }); + } + return result.thenApply(record -> { + priorityQueue.get(record.getAccessDetails()).remove(key); + final AccessDetails updatedAccessDetails = record.getAccessDetails().update(timer.getCurrentTime()); + priorityQueue.putIfAbsent(updatedAccessDetails, new CopyOnWriteArrayList<>()); + priorityQueue.get(updatedAccessDetails).add(key); + record.setAccessDetails(updatedAccessDetails); + return record.getValue(); + }); + } + + public CompletionStage setInCache(KEY key, VALUE value) { + CompletionStage result = CompletableFuture.completedFuture(null); + if (cache.containsKey(key)) { + result = cache.remove(key) + .thenAccept(oldRecord -> { + priorityQueue.get(oldRecord.getAccessDetails()).remove(key); + expiryQueue.get(oldRecord.getInsertionTime()).remove(key); + if (hasExpired(oldRecord)) { + eventQueue.add(new Eviction<>(oldRecord, Eviction.Type.EXPIRY, timer.getCurrentTime())); + } else { + eventQueue.add(new Update<>(new Record<>(key, value, timer.getCurrentTime()), oldRecord, timer.getCurrentTime())); + } + }); + } + return result.thenCompose(__ -> addToCache(key, CompletableFuture.completedFuture(value))).thenCompose(record -> { + final CompletionStage writeOperation = persistRecord(record); + return fetchAlgorithm == FetchAlgorithm.WRITE_THROUGH ? writeOperation : CompletableFuture.completedFuture(null); + }); + } + + private CompletionStage> addToCache(final KEY key, final CompletionStage valueFuture) { + manageEntries(); + final var recordFuture = valueFuture.thenApply(value -> { + final Record record = new Record<>(key, value, timer.getCurrentTime()); + expiryQueue.putIfAbsent(record.getInsertionTime(), new CopyOnWriteArrayList<>()); + expiryQueue.get(record.getInsertionTime()).add(key); + priorityQueue.putIfAbsent(record.getAccessDetails(), new CopyOnWriteArrayList<>()); + priorityQueue.get(record.getAccessDetails()).add(key); + return record; + }); + cache.put(key, recordFuture); + return recordFuture; + } + + private synchronized void manageEntries() { + if (cache.size() >= maximumSize) { + while (!expiryQueue.isEmpty() && hasExpired(expiryQueue.firstKey())) { + final List keys = expiryQueue.pollFirstEntry().getValue(); + for (final KEY key : keys) { + final Record expiredRecord = cache.remove(key).toCompletableFuture().join(); + priorityQueue.remove(expiredRecord.getAccessDetails()); + eventQueue.add(new Eviction<>(expiredRecord, Eviction.Type.EXPIRY, timer.getCurrentTime())); + } + } + } + if (cache.size() >= maximumSize) { + List keys = priorityQueue.pollFirstEntry().getValue(); + while (keys.isEmpty()) { + keys = priorityQueue.pollFirstEntry().getValue(); + } + for (final KEY key : keys) { + final Record lowestPriorityRecord = cache.remove(key).toCompletableFuture().join(); + expiryQueue.get(lowestPriorityRecord.getInsertionTime()).remove(lowestPriorityRecord.getKey()); + eventQueue.add(new Eviction<>(lowestPriorityRecord, Eviction.Type.REPLACEMENT, timer.getCurrentTime())); + } + } + } + + private CompletionStage persistRecord(final Record record) { + return dataSource.persist(record.getKey(), record.getValue(), record.getInsertionTime()) + .thenAccept(__ -> eventQueue.add(new Write<>(record, timer.getCurrentTime()))); + } + + private boolean hasExpired(final Record record) { + return hasExpired(record.getInsertionTime()); + } + + private boolean hasExpired(final Long time) { + return Duration.ofNanos(timer.getCurrentTime() - time).compareTo(expiryTime) > 0; + } + + public List> getEventQueue() { + return eventQueue; + } + + private CompletionStage loadFromDB(final DataSource dataSource, KEY key) { + return dataSource.load(key).whenComplete((value, throwable) -> { + if (throwable == null) { + eventQueue.add(new Load<>(new Record<>(key, value, timer.getCurrentTime()), timer.getCurrentTime())); + } + }); + } +} + diff --git a/distributed-cache/src/main/java/CacheBuilder.java b/distributed-cache/src/main/java/CacheBuilder.java new file mode 100644 index 0000000..4dbee63 --- /dev/null +++ b/distributed-cache/src/main/java/CacheBuilder.java @@ -0,0 +1,75 @@ +import models.EvictionAlgorithm; +import models.FetchAlgorithm; +import models.Timer; + +import java.time.Duration; +import java.util.HashSet; +import java.util.Set; + +public class CacheBuilder { + private int maximumSize; + private Duration expiryTime; + private final Set onStartLoad; + private EvictionAlgorithm evictionAlgorithm; + private FetchAlgorithm fetchAlgorithm; + private DataSource dataSource; + private Timer timer; + private int poolSize; + + public CacheBuilder() { + maximumSize = 1000; + expiryTime = Duration.ofDays(365); + fetchAlgorithm = FetchAlgorithm.WRITE_THROUGH; + evictionAlgorithm = EvictionAlgorithm.LRU; + onStartLoad = new HashSet<>(); + poolSize = 1; + timer = new Timer(); + } + + public CacheBuilder maximumSize(final int maximumSize) { + this.maximumSize = maximumSize; + return this; + } + + public CacheBuilder expiryTime(final Duration expiryTime) { + this.expiryTime = expiryTime; + return this; + } + + public CacheBuilder loadKeysOnStart(final Set keys) { + this.onStartLoad.addAll(keys); + return this; + } + + public CacheBuilder evictionAlgorithm(final EvictionAlgorithm evictionAlgorithm) { + this.evictionAlgorithm = evictionAlgorithm; + return this; + } + + public CacheBuilder fetchAlgorithm(final FetchAlgorithm fetchAlgorithm) { + this.fetchAlgorithm = fetchAlgorithm; + return this; + } + + public CacheBuilder dataSource(final DataSource dataSource) { + this.dataSource = dataSource; + return this; + } + + public CacheBuilder timer(final Timer timer) { + this.timer = timer; + return this; + } + + public CacheBuilder poolSize(final int poolSize) { + this.poolSize = poolSize; + return this; + } + + public Cache build() { + if (dataSource == null) { + throw new IllegalArgumentException("No datasource configured"); + } + return new Cache<>(maximumSize, expiryTime, fetchAlgorithm, evictionAlgorithm, dataSource, onStartLoad, timer, poolSize); + } +} diff --git a/distributed-cache/src/main/java/DataSource.java b/distributed-cache/src/main/java/DataSource.java new file mode 100644 index 0000000..7a891ab --- /dev/null +++ b/distributed-cache/src/main/java/DataSource.java @@ -0,0 +1,8 @@ +import java.util.concurrent.CompletionStage; + +public interface DataSource { + + CompletionStage load(KEY key); + + CompletionStage persist(KEY key, VALUE value, long timestamp); +} diff --git a/distributed-cache/src/main/java/events/Event.java b/distributed-cache/src/main/java/events/Event.java new file mode 100644 index 0000000..44bdbf9 --- /dev/null +++ b/distributed-cache/src/main/java/events/Event.java @@ -0,0 +1,37 @@ +package events; + +import models.Record; + +import java.util.UUID; + +public abstract class Event { + private final String id; + private final Record element; + private final long timestamp; + + public Event(Record element, long timestamp) { + this.element = element; + this.timestamp = timestamp; + id = UUID.randomUUID().toString(); + } + + public String getId() { + return id; + } + + public Record getElement() { + return element; + } + + public long getTimestamp() { + return timestamp; + } + + @Override + public String toString() { + return this.getClass().getName() + "{" + + "element=" + element + + ", timestamp=" + timestamp + + "}\n"; + } +} diff --git a/distributed-cache/src/main/java/events/Eviction.java b/distributed-cache/src/main/java/events/Eviction.java new file mode 100644 index 0000000..f5b8b3b --- /dev/null +++ b/distributed-cache/src/main/java/events/Eviction.java @@ -0,0 +1,28 @@ +package events; + +import models.Record; + +public class Eviction extends Event { + private final Type type; + + public Eviction(Record element, Type type, long timestamp) { + super(element, timestamp); + this.type = type; + } + + public Type getType() { + return type; + } + + public enum Type { + EXPIRY, REPLACEMENT + } + + @Override + public String toString() { + return "Eviction{" + + "type=" + type + + ", "+super.toString() + + "}\n"; + } +} diff --git a/distributed-cache/src/main/java/events/Load.java b/distributed-cache/src/main/java/events/Load.java new file mode 100644 index 0000000..b2736e8 --- /dev/null +++ b/distributed-cache/src/main/java/events/Load.java @@ -0,0 +1,10 @@ +package events; + +import models.Record; + +public class Load extends Event { + + public Load(Record element, long timestamp) { + super(element, timestamp); + } +} diff --git a/distributed-cache/src/main/java/events/Update.java b/distributed-cache/src/main/java/events/Update.java new file mode 100644 index 0000000..90a922b --- /dev/null +++ b/distributed-cache/src/main/java/events/Update.java @@ -0,0 +1,13 @@ +package events; + +import models.Record; + +public class Update extends Event { + + private final Record previousValue; + + public Update(Record element, Record previousValue, long timestamp) { + super(element, timestamp); + this.previousValue = previousValue; + } +} diff --git a/distributed-cache/src/main/java/events/Write.java b/distributed-cache/src/main/java/events/Write.java new file mode 100644 index 0000000..34888ba --- /dev/null +++ b/distributed-cache/src/main/java/events/Write.java @@ -0,0 +1,10 @@ +package events; + +import models.Record; + +public class Write extends Event { + + public Write(Record element, long timestamp) { + super(element, timestamp); + } +} diff --git a/distributed-cache/src/main/java/models/AccessDetails.java b/distributed-cache/src/main/java/models/AccessDetails.java new file mode 100644 index 0000000..ef6e041 --- /dev/null +++ b/distributed-cache/src/main/java/models/AccessDetails.java @@ -0,0 +1,50 @@ +package models; + +import java.util.Objects; +import java.util.concurrent.atomic.LongAdder; + +public class AccessDetails { + private final LongAdder accessCount; + private long lastAccessTime; + + public AccessDetails(long lastAccessTime) { + accessCount = new LongAdder(); + this.lastAccessTime = lastAccessTime; + } + + public long getLastAccessTime() { + return lastAccessTime; + } + + public int getAccessCount() { + return (int) accessCount.longValue(); + } + + public AccessDetails update(long lastAccessTime) { + final AccessDetails accessDetails = new AccessDetails(lastAccessTime); + accessDetails.accessCount.add(this.accessCount.longValue() + 1); + return accessDetails; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AccessDetails that = (AccessDetails) o; + return lastAccessTime == that.lastAccessTime && + this.getAccessCount() == that.getAccessCount(); + } + + @Override + public int hashCode() { + return Objects.hash(getAccessCount(), lastAccessTime); + } + + @Override + public String toString() { + return "AccessDetails{" + + "accessCount=" + accessCount + + ", lastAccessTime=" + lastAccessTime + + '}'; + } +} diff --git a/distributed-cache/src/main/java/models/EvictionAlgorithm.java b/distributed-cache/src/main/java/models/EvictionAlgorithm.java new file mode 100644 index 0000000..43ceba2 --- /dev/null +++ b/distributed-cache/src/main/java/models/EvictionAlgorithm.java @@ -0,0 +1,5 @@ +package models; + +public enum EvictionAlgorithm { + LRU, LFU +} diff --git a/distributed-cache/src/main/java/models/FetchAlgorithm.java b/distributed-cache/src/main/java/models/FetchAlgorithm.java new file mode 100644 index 0000000..4911e69 --- /dev/null +++ b/distributed-cache/src/main/java/models/FetchAlgorithm.java @@ -0,0 +1,5 @@ +package models; + +public enum FetchAlgorithm { + WRITE_THROUGH, WRITE_BACK +} diff --git a/distributed-cache/src/main/java/models/Record.java b/distributed-cache/src/main/java/models/Record.java new file mode 100644 index 0000000..51ff658 --- /dev/null +++ b/distributed-cache/src/main/java/models/Record.java @@ -0,0 +1,46 @@ +package models; + +public class Record { + private final KEY key; + private final VALUE value; + private final long insertionTime; + private AccessDetails accessDetails; + + public Record(KEY key, VALUE value, long insertionTime) { + this.key = key; + this.value = value; + this.insertionTime = insertionTime; + this.accessDetails = new AccessDetails(insertionTime); + } + + public KEY getKey() { + return key; + } + + public VALUE getValue() { + return value; + } + + public long getInsertionTime() { + return insertionTime; + } + + public AccessDetails getAccessDetails() { + return accessDetails; + } + + public void setAccessDetails(final AccessDetails accessDetails) { + this.accessDetails = accessDetails; + } + + @Override + public String toString() { + return "Record{" + + "key=" + key + + ", value=" + value + + ", insertionTime=" + insertionTime + + ", accessDetails=" + accessDetails + + '}'; + } +} + diff --git a/distributed-cache/src/main/java/models/Timer.java b/distributed-cache/src/main/java/models/Timer.java new file mode 100644 index 0000000..ae0746c --- /dev/null +++ b/distributed-cache/src/main/java/models/Timer.java @@ -0,0 +1,7 @@ +package models; + +public class Timer { + public long getCurrentTime() { + return System.nanoTime(); + } +} diff --git a/distributed-cache/src/test/java/TestCache.java b/distributed-cache/src/test/java/TestCache.java new file mode 100644 index 0000000..5340ba9 --- /dev/null +++ b/distributed-cache/src/test/java/TestCache.java @@ -0,0 +1,366 @@ +import events.Eviction; +import events.Load; +import events.Update; +import events.Write; +import models.EvictionAlgorithm; +import models.FetchAlgorithm; +import models.SettableTimer; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +public class TestCache { + + private static final String PROFILE_MUMBAI_ENGINEER = "profile_mumbai_engineer", PROFILE_HYDERABAD_ENGINEER = "profile_hyderabad_engineer"; + private final Map dataMap = new ConcurrentHashMap<>(); + private DataSource dataSource; + private final Queue> writeOperations = new LinkedList<>(); + private DataSource writeBackDataSource; + + @Before + public void setUp() { + dataMap.clear(); + writeOperations.clear(); + dataMap.put(PROFILE_MUMBAI_ENGINEER, "violet"); + dataMap.put(PROFILE_HYDERABAD_ENGINEER, "blue"); + dataSource = new DataSource<>() { + @Override + public CompletionStage load(String key) { + if (dataMap.containsKey(key)) { + return CompletableFuture.completedFuture(dataMap.get(key)); + } else { + return CompletableFuture.failedStage(new NullPointerException()); + } + } + + @Override + public CompletionStage persist(String key, String value, long timestamp) { + dataMap.put(key, value); + return CompletableFuture.completedFuture(null); + } + }; + + writeBackDataSource = new DataSource<>() { + @Override + public CompletionStage load(String key) { + if (dataMap.containsKey(key)) { + return CompletableFuture.completedFuture(dataMap.get(key)); + } else { + return CompletableFuture.failedStage(new NullPointerException()); + } + } + + @Override + public CompletionStage persist(String key, String value, long timestamp) { + final CompletableFuture hold = new CompletableFuture<>(); + writeOperations.add(hold); + return hold.thenAccept(__ -> dataMap.put(key, value)); + } + }; + } + + private void acceptWrite() { + final CompletableFuture write = writeOperations.poll(); + if (write != null) { + write.complete(null); + } + } + + @Test(expected = IllegalArgumentException.class) + public void testCacheConstructionWithoutDataSourceFailure() { + new CacheBuilder<>().build(); + } + + @Test + public void testCacheDefaultBehavior() throws ExecutionException, InterruptedException { + final var cache = new CacheBuilder().dataSource(dataSource).build(); + Assert.assertNotNull(cache); + assert isEqualTo(cache.get(PROFILE_MUMBAI_ENGINEER), "violet"); + assert cache.get("random") + .exceptionally(throwable -> Boolean.TRUE.toString()) + .thenApply(Boolean::valueOf) + .toCompletableFuture() + .get(); + assert isEqualTo(cache.set(PROFILE_MUMBAI_ENGINEER, "brown").thenCompose(__ -> cache.get(PROFILE_MUMBAI_ENGINEER)), "brown"); + Assert.assertEquals(3, cache.getEventQueue().size()); + assert cache.getEventQueue().get(0) instanceof Load; + assert cache.getEventQueue().get(1) instanceof Update; + assert cache.getEventQueue().get(2) instanceof Write; + } + + @Test + public void Eviction_LRU() { + final var maximumSize = 2; + final var cache = new CacheBuilder() + .maximumSize(maximumSize) + .evictionAlgorithm(EvictionAlgorithm.LRU) + .fetchAlgorithm(FetchAlgorithm.WRITE_BACK) + .dataSource(writeBackDataSource).build(); + cache.get(PROFILE_MUMBAI_ENGINEER).toCompletableFuture().join(); + for (int i = 0; i < maximumSize; i++) { + cache.set("key" + i, "value" + i).toCompletableFuture().join(); + } + Assert.assertEquals(2, cache.getEventQueue().size()); + assert cache.getEventQueue().get(0) instanceof Load; + assert cache.getEventQueue().get(1) instanceof Eviction; + final var evictionEvent = (Eviction) cache.getEventQueue().get(1); + Assert.assertEquals(Eviction.Type.REPLACEMENT, evictionEvent.getType()); + Assert.assertEquals(PROFILE_MUMBAI_ENGINEER, evictionEvent.getElement().getKey()); + cache.getEventQueue().clear(); + final var permutation = new ArrayList(); + for (int i = 0; i < maximumSize; i++) { + permutation.add(i); + } + Collections.shuffle(permutation); + for (final int index : permutation) { + cache.get("key" + index).toCompletableFuture().join(); + } + for (int i = 0; i < maximumSize; i++) { + cache.set("random" + permutation.get(i), "random_value").toCompletableFuture().join(); + assert cache.getEventQueue().get(i) instanceof Eviction; + final var eviction = (Eviction) cache.getEventQueue().get(i); + Assert.assertEquals(Eviction.Type.REPLACEMENT, eviction.getType()); + Assert.assertEquals("key" + permutation.get(i), eviction.getElement().getKey()); + } + } + + @Test + public void Eviction_LFU() { + final var maximumSize = 2; + final var cache = new CacheBuilder() + .maximumSize(maximumSize) + .evictionAlgorithm(EvictionAlgorithm.LFU) + .fetchAlgorithm(FetchAlgorithm.WRITE_BACK) + .dataSource(writeBackDataSource) + .build(); + cache.get(PROFILE_MUMBAI_ENGINEER).toCompletableFuture().join(); + for (int i = 0; i < maximumSize; i++) { + cache.set("key" + i, "value" + i).toCompletableFuture().join(); + } + Assert.assertEquals(2, cache.getEventQueue().size()); + assert cache.getEventQueue().get(0) instanceof Load; + assert cache.getEventQueue().get(1) instanceof Eviction; + final var evictionEvent = (Eviction) cache.getEventQueue().get(1); + Assert.assertEquals(Eviction.Type.REPLACEMENT, evictionEvent.getType()); + Assert.assertEquals("key0", evictionEvent.getElement().getKey()); + for (int i = 0; i < maximumSize; i++) { + acceptWrite(); + } + final var permutation = new ArrayList(); + for (int i = 0; i < maximumSize; i++) { + permutation.add(i); + } + Collections.shuffle(permutation); + for (final int index : permutation) { + for (int i = 0; i <= index; i++) { + cache.get("key" + index).toCompletableFuture().join(); + } + } + cache.getEventQueue().clear(); + for (int i = 0; i < maximumSize; i++) { + cache.set("random" + i, "random_value").toCompletableFuture().join(); + acceptWrite(); + for (int j = 0; j <= maximumSize; j++) { + cache.get("random" + i).toCompletableFuture().join(); + } + Assert.assertEquals(Eviction.class.getName(), cache.getEventQueue().get(i * 2).getClass().getName()); + Assert.assertEquals(Write.class.getName(), cache.getEventQueue().get(i * 2 + 1).getClass().getName()); + final var eviction = (Eviction) cache.getEventQueue().get(i * 2); + System.out.println(cache.getEventQueue().get(i)); + Assert.assertEquals(Eviction.Type.REPLACEMENT, eviction.getType()); + Assert.assertEquals("key" + i, eviction.getElement().getKey()); + } + } + + @Test + public void ExpiryOnGet() { + final var timer = new SettableTimer(); + final var startTime = System.nanoTime(); + final var cache = new CacheBuilder().timer(timer).dataSource(dataSource).expiryTime(Duration.ofSeconds(10)).build(); + timer.setTime(startTime); + cache.get(PROFILE_MUMBAI_ENGINEER).toCompletableFuture().join(); + Assert.assertEquals(1, cache.getEventQueue().size()); + assert cache.getEventQueue().get(0) instanceof Load; + Assert.assertEquals(PROFILE_MUMBAI_ENGINEER, cache.getEventQueue().get(0).getElement().getKey()); + timer.setTime(startTime + Duration.ofSeconds(10).toNanos() + 1); + cache.get(PROFILE_MUMBAI_ENGINEER).toCompletableFuture().join(); + Assert.assertEquals(3, cache.getEventQueue().size()); + assert cache.getEventQueue().get(1) instanceof Eviction; + assert cache.getEventQueue().get(2) instanceof Load; + final var eviction = (Eviction) cache.getEventQueue().get(1); + Assert.assertEquals(Eviction.Type.EXPIRY, eviction.getType()); + Assert.assertEquals(PROFILE_MUMBAI_ENGINEER, eviction.getElement().getKey()); + } + + @Test + public void ExpiryOnSet() { + final var timer = new SettableTimer(); + final var startTime = System.nanoTime(); + final var cache = new CacheBuilder().timer(timer).dataSource(dataSource).expiryTime(Duration.ofSeconds(10)).build(); + timer.setTime(startTime); + cache.get(PROFILE_MUMBAI_ENGINEER).toCompletableFuture().join(); + Assert.assertEquals(1, cache.getEventQueue().size()); + assert cache.getEventQueue().get(0) instanceof Load; + Assert.assertEquals(PROFILE_MUMBAI_ENGINEER, cache.getEventQueue().get(0).getElement().getKey()); + timer.setTime(startTime + Duration.ofSeconds(10).toNanos() + 1); + cache.set(PROFILE_MUMBAI_ENGINEER, "blue").toCompletableFuture().join(); + Assert.assertEquals(3, cache.getEventQueue().size()); + assert cache.getEventQueue().get(1) instanceof Eviction; + assert cache.getEventQueue().get(2) instanceof Write; + final var eviction = (Eviction) cache.getEventQueue().get(1); + Assert.assertEquals(Eviction.Type.EXPIRY, eviction.getType()); + Assert.assertEquals(PROFILE_MUMBAI_ENGINEER, eviction.getElement().getKey()); + } + + @Test + public void ExpiryOnEviction() { + final var timer = new SettableTimer(); + final var startTime = System.nanoTime(); + final var cache = new CacheBuilder().maximumSize(2).timer(timer).dataSource(dataSource).expiryTime(Duration.ofSeconds(10)).build(); + timer.setTime(startTime); + cache.get(PROFILE_MUMBAI_ENGINEER).toCompletableFuture().join(); + cache.get(PROFILE_HYDERABAD_ENGINEER).toCompletableFuture().join(); + timer.setTime(startTime + Duration.ofSeconds(10).toNanos() + 1); + cache.set("randomKey", "randomValue").toCompletableFuture().join(); + Assert.assertEquals(5, cache.getEventQueue().size()); + assert cache.getEventQueue().get(2) instanceof Eviction; + assert cache.getEventQueue().get(3) instanceof Eviction; + assert cache.getEventQueue().get(4) instanceof Write; + final var eviction1 = (Eviction) cache.getEventQueue().get(2); + Assert.assertEquals(Eviction.Type.EXPIRY, eviction1.getType()); + Assert.assertEquals(PROFILE_MUMBAI_ENGINEER, eviction1.getElement().getKey()); + final var eviction2 = (Eviction) cache.getEventQueue().get(3); + Assert.assertEquals(Eviction.Type.EXPIRY, eviction2.getType()); + Assert.assertEquals(PROFILE_HYDERABAD_ENGINEER, eviction2.getElement().getKey()); + } + + @Test + public void FetchingWriteBack() { + final var cache = new CacheBuilder() + .maximumSize(1) + .dataSource(writeBackDataSource) + .fetchAlgorithm(FetchAlgorithm.WRITE_BACK) + .build(); + cache.set("randomKey", "randomValue").toCompletableFuture().join(); + Assert.assertEquals(0, cache.getEventQueue().size()); + Assert.assertNull(dataMap.get("randomValue")); + acceptWrite(); + } + + @Test + public void FetchingWriteThrough() { + final var cache = new CacheBuilder().dataSource(dataSource).fetchAlgorithm(FetchAlgorithm.WRITE_THROUGH).build(); + cache.set("randomKey", "randomValue").toCompletableFuture().join(); + Assert.assertEquals(1, cache.getEventQueue().size()); + assert cache.getEventQueue().get(0) instanceof Write; + Assert.assertEquals("randomValue", dataMap.get("randomKey")); + } + + @Test + public void EagerLoading() { + final var eagerlyLoad = new HashSet(); + eagerlyLoad.add(PROFILE_MUMBAI_ENGINEER); + eagerlyLoad.add(PROFILE_HYDERABAD_ENGINEER); + final var cache = new CacheBuilder() + .loadKeysOnStart(eagerlyLoad) + .dataSource(dataSource) + .build(); + Assert.assertEquals(2, cache.getEventQueue().size()); + assert cache.getEventQueue().get(0) instanceof Load; + assert cache.getEventQueue().get(1) instanceof Load; + cache.getEventQueue().clear(); + dataMap.clear(); + isEqualTo(cache.get(PROFILE_MUMBAI_ENGINEER), "violet"); + isEqualTo(cache.get(PROFILE_HYDERABAD_ENGINEER), "blue"); + Assert.assertEquals(0, cache.getEventQueue().size()); + } + + @Test + public void RaceConditions() throws ExecutionException, InterruptedException { + final var cache = new CacheBuilder() + .poolSize(8) + .dataSource(dataSource).build(); + final var cacheEntries = new HashMap>(); + final var numberOfEntries = 100; + final var numberOfValues = 1000; + final String[] keyList = new String[numberOfEntries]; + final Map inverseMapping = new HashMap<>(); + for (int entry = 0; entry < numberOfEntries; entry++) { + final var key = UUID.randomUUID().toString(); + keyList[entry] = key; + inverseMapping.put(key, entry); + cacheEntries.put(key, new ArrayList<>()); + final var firstValue = UUID.randomUUID().toString(); + dataMap.put(key, firstValue); + cacheEntries.get(key).add(firstValue); + for (int value = 0; value < numberOfValues - 1; value++) { + cacheEntries.get(key).add(UUID.randomUUID().toString()); + } + } + final Random random = new Random(); + final List> futures = new ArrayList<>(); + final List queries = new ArrayList<>(); + final int[] updates = new int[numberOfEntries]; + for (int i = 0; i < 1000000; i++) { + final var index = random.nextInt(numberOfEntries); + final var key = keyList[index]; + if (Math.random() <= 0.05) { + if (updates[index] - 1 < numberOfEntries) { + updates[index]++; + } + cache.set(key, cacheEntries.get(key).get(updates[index] + 1)); + } else { + queries.add(key); + futures.add(cache.get(key)); + } + } + final CompletionStage> results = CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)) + .thenApply(__ -> futures.stream() + .map(CompletionStage::toCompletableFuture) + .map(CompletableFuture::join) + .collect(Collectors.toList())); + final int[] currentIndexes = new int[numberOfEntries]; + final StringBuilder stringBuilder = new StringBuilder(); + results.thenAccept(values -> { + for (int i = 0; i < values.size(); i++) { + final var key = queries.get(i); + final var possibleValuesForKey = cacheEntries.get(key); + final var currentValue = currentIndexes[inverseMapping.get(key)]; + if (!possibleValuesForKey.get(currentValue).equals(values.get(i))) { + int offset = 1; + while (currentValue + offset < numberOfValues && !possibleValuesForKey.get(currentValue + offset).equals(values.get(i))) { + offset++; + } + if (currentValue + offset == numberOfValues) { + System.out.println(Arrays.stream(stringBuilder.toString().split("\n")).filter(line -> line.contains(key)).collect(Collectors.joining("\n"))); + System.err.println(key); + System.err.println(possibleValuesForKey); + System.err.println(possibleValuesForKey.get(currentValue) + " index: " + currentIndexes[inverseMapping.get(key)]); + System.err.println(values.get(i)); + throw new IllegalStateException(); + } + currentIndexes[inverseMapping.get(key)] += offset; + stringBuilder.append(key).append(" index: ").append(currentIndexes[inverseMapping.get(key)]).append(" ").append(values.get(i)).append('\n'); + } + } + }).toCompletableFuture().join(); + } + + private boolean isEqualTo(CompletionStage future, String value) { + return future.thenApply(result -> { + if (result.equals(value)) { + return true; + } else { + throw new AssertionError(); + } + }).toCompletableFuture().join(); + } +} diff --git a/distributed-cache/src/test/java/models/SettableTimer.java b/distributed-cache/src/test/java/models/SettableTimer.java new file mode 100644 index 0000000..08c4111 --- /dev/null +++ b/distributed-cache/src/test/java/models/SettableTimer.java @@ -0,0 +1,14 @@ +package models; + +public class SettableTimer extends Timer { + private long time = -1; + + @Override + public long getCurrentTime() { + return time == -1 ? System.nanoTime() : time; + } + + public void setTime(long time) { + this.time = time; + } +} diff --git a/distributed-cache/target/classes/Cache.class b/distributed-cache/target/classes/Cache.class new file mode 100644 index 0000000..d0cde8b Binary files /dev/null and b/distributed-cache/target/classes/Cache.class differ diff --git a/distributed-cache/target/classes/CacheBuilder.class b/distributed-cache/target/classes/CacheBuilder.class new file mode 100644 index 0000000..16194b6 Binary files /dev/null and b/distributed-cache/target/classes/CacheBuilder.class differ diff --git a/distributed-cache/target/classes/DataSource.class b/distributed-cache/target/classes/DataSource.class new file mode 100644 index 0000000..4c1ecc3 Binary files /dev/null and b/distributed-cache/target/classes/DataSource.class differ diff --git a/distributed-cache/target/classes/events/Event.class b/distributed-cache/target/classes/events/Event.class new file mode 100644 index 0000000..1dc9d49 Binary files /dev/null and b/distributed-cache/target/classes/events/Event.class differ diff --git a/distributed-cache/target/classes/events/Eviction$Type.class b/distributed-cache/target/classes/events/Eviction$Type.class new file mode 100644 index 0000000..d24f558 Binary files /dev/null and b/distributed-cache/target/classes/events/Eviction$Type.class differ diff --git a/distributed-cache/target/classes/events/Eviction.class b/distributed-cache/target/classes/events/Eviction.class new file mode 100644 index 0000000..cc7f8f6 Binary files /dev/null and b/distributed-cache/target/classes/events/Eviction.class differ diff --git a/distributed-cache/target/classes/events/Load.class b/distributed-cache/target/classes/events/Load.class new file mode 100644 index 0000000..aa916aa Binary files /dev/null and b/distributed-cache/target/classes/events/Load.class differ diff --git a/distributed-cache/target/classes/events/Update.class b/distributed-cache/target/classes/events/Update.class new file mode 100644 index 0000000..e745786 Binary files /dev/null and b/distributed-cache/target/classes/events/Update.class differ diff --git a/distributed-cache/target/classes/events/Write.class b/distributed-cache/target/classes/events/Write.class new file mode 100644 index 0000000..ff5ad21 Binary files /dev/null and b/distributed-cache/target/classes/events/Write.class differ diff --git a/distributed-cache/target/classes/models/AccessDetails.class b/distributed-cache/target/classes/models/AccessDetails.class new file mode 100644 index 0000000..c0c58e7 Binary files /dev/null and b/distributed-cache/target/classes/models/AccessDetails.class differ diff --git a/distributed-cache/target/classes/models/EvictionAlgorithm.class b/distributed-cache/target/classes/models/EvictionAlgorithm.class new file mode 100644 index 0000000..c4422ee Binary files /dev/null and b/distributed-cache/target/classes/models/EvictionAlgorithm.class differ diff --git a/distributed-cache/target/classes/models/FetchAlgorithm.class b/distributed-cache/target/classes/models/FetchAlgorithm.class new file mode 100644 index 0000000..d174340 Binary files /dev/null and b/distributed-cache/target/classes/models/FetchAlgorithm.class differ diff --git a/distributed-cache/target/classes/models/Record.class b/distributed-cache/target/classes/models/Record.class new file mode 100644 index 0000000..5746ab4 Binary files /dev/null and b/distributed-cache/target/classes/models/Record.class differ diff --git a/distributed-cache/target/classes/models/Timer.class b/distributed-cache/target/classes/models/Timer.class new file mode 100644 index 0000000..65e9434 Binary files /dev/null and b/distributed-cache/target/classes/models/Timer.class differ diff --git a/distributed-cache/target/test-classes/TestCache$1.class b/distributed-cache/target/test-classes/TestCache$1.class new file mode 100644 index 0000000..f1fa61b Binary files /dev/null and b/distributed-cache/target/test-classes/TestCache$1.class differ diff --git a/distributed-cache/target/test-classes/TestCache$2.class b/distributed-cache/target/test-classes/TestCache$2.class new file mode 100644 index 0000000..fb22340 Binary files /dev/null and b/distributed-cache/target/test-classes/TestCache$2.class differ diff --git a/distributed-cache/target/test-classes/TestCache.class b/distributed-cache/target/test-classes/TestCache.class new file mode 100644 index 0000000..14759e6 Binary files /dev/null and b/distributed-cache/target/test-classes/TestCache.class differ diff --git a/distributed-cache/target/test-classes/models/SettableTimer.class b/distributed-cache/target/test-classes/models/SettableTimer.class new file mode 100644 index 0000000..0e2dfae Binary files /dev/null and b/distributed-cache/target/test-classes/models/SettableTimer.class differ diff --git a/distributed-event-bus/.idea/.gitignore b/distributed-event-bus/.idea/.gitignore new file mode 100644 index 0000000..26d3352 --- /dev/null +++ b/distributed-event-bus/.idea/.gitignore @@ -0,0 +1,3 @@ +# Default ignored files +/shelf/ +/workspace.xml diff --git a/distributed-event-bus/.idea/compiler.xml b/distributed-event-bus/.idea/compiler.xml new file mode 100644 index 0000000..f84848a --- /dev/null +++ b/distributed-event-bus/.idea/compiler.xml @@ -0,0 +1,16 @@ + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/distributed-event-bus/.idea/jarRepositories.xml b/distributed-event-bus/.idea/jarRepositories.xml new file mode 100644 index 0000000..712ab9d --- /dev/null +++ b/distributed-event-bus/.idea/jarRepositories.xml @@ -0,0 +1,20 @@ + + + + + + + + + + + \ No newline at end of file diff --git a/distributed-event-bus/.idea/libraries/Maven__aopalliance_aopalliance_1_0.xml b/distributed-event-bus/.idea/libraries/Maven__aopalliance_aopalliance_1_0.xml new file mode 100644 index 0000000..30ff5cb --- /dev/null +++ b/distributed-event-bus/.idea/libraries/Maven__aopalliance_aopalliance_1_0.xml @@ -0,0 +1,13 @@ + + + + + + + + + + + + + \ No newline at end of file diff --git a/distributed-event-bus/.idea/libraries/Maven__com_google_code_gson_gson_2_8_6.xml b/distributed-event-bus/.idea/libraries/Maven__com_google_code_gson_gson_2_8_6.xml new file mode 100644 index 0000000..82a9f20 --- /dev/null +++ b/distributed-event-bus/.idea/libraries/Maven__com_google_code_gson_gson_2_8_6.xml @@ -0,0 +1,13 @@ + + + + + + + + + + + + + \ No newline at end of file diff --git a/distributed-event-bus/.idea/libraries/Maven__com_google_guava_guava_16_0_1.xml b/distributed-event-bus/.idea/libraries/Maven__com_google_guava_guava_16_0_1.xml new file mode 100644 index 0000000..b7c7684 --- /dev/null +++ b/distributed-event-bus/.idea/libraries/Maven__com_google_guava_guava_16_0_1.xml @@ -0,0 +1,13 @@ + + + + + + + + + + + + + \ No newline at end of file diff --git a/distributed-event-bus/.idea/libraries/Maven__com_google_inject_guice_4_0.xml b/distributed-event-bus/.idea/libraries/Maven__com_google_inject_guice_4_0.xml new file mode 100644 index 0000000..221b0da --- /dev/null +++ b/distributed-event-bus/.idea/libraries/Maven__com_google_inject_guice_4_0.xml @@ -0,0 +1,13 @@ + + + + + + + + + + + + + \ No newline at end of file diff --git a/distributed-event-bus/.idea/libraries/Maven__javax_inject_javax_inject_1.xml b/distributed-event-bus/.idea/libraries/Maven__javax_inject_javax_inject_1.xml new file mode 100644 index 0000000..93cf65a --- /dev/null +++ b/distributed-event-bus/.idea/libraries/Maven__javax_inject_javax_inject_1.xml @@ -0,0 +1,13 @@ + + + + + + + + + + + + + \ No newline at end of file diff --git a/distributed-event-bus/.idea/libraries/Maven__junit_junit_4_13.xml b/distributed-event-bus/.idea/libraries/Maven__junit_junit_4_13.xml new file mode 100644 index 0000000..59fc5c4 --- /dev/null +++ b/distributed-event-bus/.idea/libraries/Maven__junit_junit_4_13.xml @@ -0,0 +1,13 @@ + + + + + + + + + + + + + \ No newline at end of file diff --git a/distributed-event-bus/.idea/libraries/Maven__org_hamcrest_hamcrest_core_1_3.xml b/distributed-event-bus/.idea/libraries/Maven__org_hamcrest_hamcrest_core_1_3.xml new file mode 100644 index 0000000..f58bbc1 --- /dev/null +++ b/distributed-event-bus/.idea/libraries/Maven__org_hamcrest_hamcrest_core_1_3.xml @@ -0,0 +1,13 @@ + + + + + + + + + + + + + \ No newline at end of file diff --git a/distributed-event-bus/.idea/misc.xml b/distributed-event-bus/.idea/misc.xml new file mode 100644 index 0000000..2e289ef --- /dev/null +++ b/distributed-event-bus/.idea/misc.xml @@ -0,0 +1,13 @@ + + + + + + + + + \ No newline at end of file diff --git a/distributed-event-bus/.idea/modules.xml b/distributed-event-bus/.idea/modules.xml new file mode 100644 index 0000000..0b19bb8 --- /dev/null +++ b/distributed-event-bus/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/distributed-event-bus/.idea/uiDesigner.xml b/distributed-event-bus/.idea/uiDesigner.xml new file mode 100644 index 0000000..e96534f --- /dev/null +++ b/distributed-event-bus/.idea/uiDesigner.xml @@ -0,0 +1,124 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/distributed-event-bus/.idea/vcs.xml b/distributed-event-bus/.idea/vcs.xml new file mode 100644 index 0000000..6c0b863 --- /dev/null +++ b/distributed-event-bus/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/distributed-event-bus/README.ME b/distributed-event-bus/README.ME new file mode 100644 index 0000000..6d79a30 --- /dev/null +++ b/distributed-event-bus/README.ME @@ -0,0 +1,8 @@ +1) Multiple publishers and subscribers (Register from any class to eventbus) +2) Causal ordering of topics +3) Supports configurable retry attempts. +4) Have a dead letter queue. +5) Idempotency on event receiving +6) Allow both pull and push models +7) Allow subscribing from a timestamp or offset +8) Allow preconditions for event subscription \ No newline at end of file diff --git a/distributed-event-bus/distributed-event-bus.iml b/distributed-event-bus/distributed-event-bus.iml new file mode 100644 index 0000000..c03d76f --- /dev/null +++ b/distributed-event-bus/distributed-event-bus.iml @@ -0,0 +1,22 @@ + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/distributed-event-bus/pom.xml b/distributed-event-bus/pom.xml new file mode 100644 index 0000000..b905052 --- /dev/null +++ b/distributed-event-bus/pom.xml @@ -0,0 +1,42 @@ + + + 4.0.0 + + interviewready.io + distributed-event-bus + 1.0 + + + + org.apache.maven.plugins + maven-compiler-plugin + + 11 + 11 + + + + + + + + junit + junit + 4.13 + test + + + com.google.inject + guice + 4.0 + + + com.google.code.gson + gson + 2.8.6 + + + + \ No newline at end of file diff --git a/distributed-event-bus/src/main/java/EventBus.java b/distributed-event-bus/src/main/java/EventBus.java new file mode 100644 index 0000000..581ccbf --- /dev/null +++ b/distributed-event-bus/src/main/java/EventBus.java @@ -0,0 +1,203 @@ +import com.google.inject.Inject; +import com.google.inject.Singleton; +import exceptions.RetryLimitExceededException; +import exceptions.UnsubscribedPollException; +import lib.KeyedExecutor; +import models.Event; +import models.FailureEvent; +import models.Subscription; +import util.Timer; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.*; +import java.util.function.Function; +import java.util.function.Predicate; + +@Singleton +public class EventBus { + private final Map> topics; + private final Map> eventIndexes; + private final Map> eventTimestamps; + private final Map> pullSubscriptions; + private final Map> pushSubscriptions; + private final KeyedExecutor eventExecutor; + private final KeyedExecutor broadcastExecutor; + private EventBus deadLetterQueue; + private final Timer timer; + + @Inject + public EventBus(final KeyedExecutor eventExecutor, final KeyedExecutor broadcastExecutor, final Timer timer) { + this.topics = new ConcurrentHashMap<>(); + this.eventIndexes = new ConcurrentHashMap<>(); + this.eventTimestamps = new ConcurrentHashMap<>(); + this.pullSubscriptions = new ConcurrentHashMap<>(); + this.pushSubscriptions = new ConcurrentHashMap<>(); + this.eventExecutor = eventExecutor; + this.broadcastExecutor = broadcastExecutor; + this.timer = timer; + } + + public void setDeadLetterQueue(final EventBus deadLetterQueue) { + this.deadLetterQueue = deadLetterQueue; + } + + public CompletionStage publish(final String topic, final Event event) { + return eventExecutor.getThreadFor(topic, publishToBus(topic, event)); + } + + private CompletionStage publishToBus(final String topic, final Event event) { + if (eventIndexes.containsKey(topic) && eventIndexes.get(topic).containsKey(event.getId())) { + return null; + } + topics.putIfAbsent(topic, new CopyOnWriteArrayList<>()); + eventIndexes.putIfAbsent(topic, new ConcurrentHashMap<>()); + eventIndexes.get(topic).put(event.getId(), topics.get(topic).size()); + eventTimestamps.putIfAbsent(topic, new ConcurrentSkipListMap<>()); + eventTimestamps.get(topic).put(timer.getCurrentTime(), event.getId()); + topics.get(topic).add(event); + return notifyPushSubscribers(topic, event); + } + + private CompletionStage notifyPushSubscribers(String topic, Event event) { + if (!pushSubscriptions.containsKey(topic)) { + return CompletableFuture.completedStage(null); + } + final var subscribersForTopic = pushSubscriptions.get(topic); + final var notifications = subscribersForTopic.values() + .stream() + .filter(subscription -> subscription.getPrecondition().test(event)) + .map(subscription -> executeEventHandler(event, subscription)) + .toArray(CompletableFuture[]::new); + return CompletableFuture.allOf(notifications); + } + + private CompletionStage executeEventHandler(final Event event, Subscription subscription) { + return broadcastExecutor.getThreadFor(subscription.getTopic() + subscription.getSubscriber(), + doWithRetry(event, subscription.getEventHandler(), + 1, subscription.getNumberOfRetries()) + .exceptionally(throwable -> { + if (deadLetterQueue != null) { + deadLetterQueue.publish(subscription.getTopic(), new FailureEvent(event, throwable, timer.getCurrentTime())); + } + return null; + })); + } + + private CompletionStage doWithRetry(final Event event, + final Function> task, + final int coolDownIntervalInMillis, + final int remainingTries) { + return task.apply(event).handle((__, throwable) -> { + if (throwable != null) { + if (remainingTries == 1) { + throw new RetryLimitExceededException(throwable); + } + try { + Thread.sleep(coolDownIntervalInMillis); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return doWithRetry(event, task, Math.max(coolDownIntervalInMillis * 2, 10), remainingTries - 1); + } else { + return CompletableFuture.completedFuture((Void) null); + } + }).thenCompose(Function.identity()); + } + + + public CompletionStage poll(final String topic, final String subscriber) { + return eventExecutor.getThreadFor(topic + subscriber, () -> pollBus(topic, subscriber)); + } + + private Event pollBus(final String topic, final String subscriber) { + var subscription = pullSubscriptions.getOrDefault(topic, new HashMap<>()).get(subscriber); + if (subscription == null) { + throw new UnsubscribedPollException(); + } + for (var index = subscription.getCurrentIndex(); index.intValue() < topics.get(topic).size(); index.increment()) { + var event = topics.get(topic).get(index.intValue()); + if (subscription.getPrecondition().test(event)) { + index.increment(); + return event; + } + } + return null; + } + + public CompletionStage subscribeToEventsAfter(final String topic, final String subscriber, final long timeStamp) { + return eventExecutor.getThreadFor(topic + subscriber, () -> moveIndexAtTimestamp(topic, subscriber, timeStamp)); + } + + private void moveIndexAtTimestamp(final String topic, final String subscriber, final long timeStamp) { + final var closestEventAfter = eventTimestamps.get(topic).higherEntry(timeStamp); + if (closestEventAfter == null) { + pullSubscriptions.get(topic).get(subscriber).setCurrentIndex(eventIndexes.get(topic).size()); + } else { + final var eventIndex = eventIndexes.get(topic).get(closestEventAfter.getValue()); + pullSubscriptions.get(topic).get(subscriber).setCurrentIndex(eventIndex); + } + } + + public CompletionStage subscribeToEventsAfter(final String topic, final String subscriber, final String eventId) { + return eventExecutor.getThreadFor(topic + subscriber, () -> moveIndexAfterEvent(topic, subscriber, eventId)); + } + + private void moveIndexAfterEvent(final String topic, final String subscriber, final String eventId) { + if (eventId == null) { + pullSubscriptions.get(topic).get(subscriber).setCurrentIndex(0); + } else { + final var eventIndex = eventIndexes.get(topic).get(eventId) + 1; + pullSubscriptions.get(topic).get(subscriber).setCurrentIndex(eventIndex); + } + } + + public CompletionStage subscribeForPush(final String topic, + final String subscriber, + final Predicate precondition, + final Function> handler, + final int numberOfRetries) { + return eventExecutor.getThreadFor(topic + subscriber, + () -> subscribeForPushEvents(topic, subscriber, precondition, handler, numberOfRetries)); + } + + private void subscribeForPushEvents(final String topic, + final String subscriber, + final Predicate precondition, + final Function> handler, + final int numberOfRetries) { + addSubscriber(pushSubscriptions, subscriber, precondition, topic, handler, numberOfRetries); + } + + private void addSubscriber(final Map> pullSubscriptions, + final String subscriber, + final Predicate precondition, + final String topic, + final Function> handler, + final int numberOfRetries) { + pullSubscriptions.putIfAbsent(topic, new ConcurrentHashMap<>()); + final var subscription = new Subscription(topic, subscriber, precondition, handler, numberOfRetries); + subscription.setCurrentIndex(topics.getOrDefault(topic, new ArrayList<>()).size()); + pullSubscriptions.get(topic).put(subscriber, subscription); + } + + public CompletionStage subscribeForPull(final String topic, final String subscriber, final Predicate precondition) { + return eventExecutor.getThreadFor(topic + subscriber, () -> subscribeForPullEvents(topic, subscriber, precondition)); + } + + private void subscribeForPullEvents(final String topic, final String subscriber, final Predicate precondition) { + addSubscriber(pullSubscriptions, subscriber, precondition, topic, null, 0); + } + + public CompletionStage unsubscribe(final String topic, final String subscriber) { + return eventExecutor.getThreadFor(topic + subscriber, () -> unsubscribeFromTopic(topic, subscriber)); + } + + private void unsubscribeFromTopic(final String topic, final String subscriber) { + pushSubscriptions.getOrDefault(topic, new HashMap<>()).remove(subscriber); + pullSubscriptions.getOrDefault(topic, new HashMap<>()).remove(subscriber); + } +} + diff --git a/distributed-event-bus/src/main/java/exceptions/RetryLimitExceededException.java b/distributed-event-bus/src/main/java/exceptions/RetryLimitExceededException.java new file mode 100644 index 0000000..ceb2ae1 --- /dev/null +++ b/distributed-event-bus/src/main/java/exceptions/RetryLimitExceededException.java @@ -0,0 +1,7 @@ +package exceptions; + +public class RetryLimitExceededException extends RuntimeException { + public RetryLimitExceededException(Throwable cause) { + super(cause); + } +} diff --git a/distributed-event-bus/src/main/java/exceptions/UnsubscribedPollException.java b/distributed-event-bus/src/main/java/exceptions/UnsubscribedPollException.java new file mode 100644 index 0000000..3a6e8e8 --- /dev/null +++ b/distributed-event-bus/src/main/java/exceptions/UnsubscribedPollException.java @@ -0,0 +1,4 @@ +package exceptions; + +public class UnsubscribedPollException extends RuntimeException { +} diff --git a/distributed-event-bus/src/main/java/lib/KeyedExecutor.java b/distributed-event-bus/src/main/java/lib/KeyedExecutor.java new file mode 100644 index 0000000..ddc628f --- /dev/null +++ b/distributed-event-bus/src/main/java/lib/KeyedExecutor.java @@ -0,0 +1,31 @@ +package lib; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.function.Function; +import java.util.function.Supplier; + +public class KeyedExecutor { + private final Executor[] executorPool; + + public KeyedExecutor(final int poolSize) { + this.executorPool = new Executor[poolSize]; + for (int i = 0; i < poolSize; i++) { + executorPool[i] = Executors.newSingleThreadExecutor(); + } + } + + public CompletionStage getThreadFor(KEY key, Runnable task) { + return CompletableFuture.runAsync(task, executorPool[Math.abs(key.hashCode() % executorPool.length)]); + } + + public CompletionStage getThreadFor(KEY key, Supplier task) { + return CompletableFuture.supplyAsync(task, executorPool[Math.abs(key.hashCode() % executorPool.length)]); + } + + public CompletionStage getThreadFor(KEY key, CompletionStage task) { + return CompletableFuture.supplyAsync(() -> task, executorPool[Math.abs(key.hashCode() % executorPool.length)]).thenCompose(Function.identity()); + } +} diff --git a/distributed-event-bus/src/main/java/models/Event.java b/distributed-event-bus/src/main/java/models/Event.java new file mode 100644 index 0000000..417deff --- /dev/null +++ b/distributed-event-bus/src/main/java/models/Event.java @@ -0,0 +1,43 @@ +package models; + +import java.util.Objects; +import java.util.UUID; + +public class Event { + private final String id; + private final String publisher; + private final EventType eventType; + private final String description; + private final long creationTime; + + public Event(final String publisher, + final EventType eventType, + final String description, + final long creationTime) { + this.description = description; + this.id = UUID.randomUUID().toString(); + this.publisher = publisher; + this.eventType = eventType; + this.creationTime = creationTime; + } + + public String getId() { + return id; + } + + public String getPublisher() { + return publisher; + } + + public EventType getEventType() { + return eventType; + } + + public long getCreationTime() { + return creationTime; + } + + public String getDescription() { + return description; + } +} \ No newline at end of file diff --git a/distributed-event-bus/src/main/java/models/EventType.java b/distributed-event-bus/src/main/java/models/EventType.java new file mode 100644 index 0000000..4b74232 --- /dev/null +++ b/distributed-event-bus/src/main/java/models/EventType.java @@ -0,0 +1,5 @@ +package models; + +public enum EventType { + PRIORITY, LOGGING, ERROR +} diff --git a/distributed-event-bus/src/main/java/models/FailureEvent.java b/distributed-event-bus/src/main/java/models/FailureEvent.java new file mode 100644 index 0000000..4bf5c86 --- /dev/null +++ b/distributed-event-bus/src/main/java/models/FailureEvent.java @@ -0,0 +1,20 @@ +package models; + +public class FailureEvent extends Event { + private final Event event; + private final Throwable throwable; + + public FailureEvent(Event event, Throwable throwable, long failureTimestamp) { + super("dead-letter-queue", EventType.ERROR, throwable.getMessage(), failureTimestamp); + this.event = event; + this.throwable = throwable; + } + + public Event getEvent() { + return event; + } + + public Throwable getThrowable() { + return throwable; + } +} diff --git a/distributed-event-bus/src/main/java/models/Subscription.java b/distributed-event-bus/src/main/java/models/Subscription.java new file mode 100644 index 0000000..f23c11a --- /dev/null +++ b/distributed-event-bus/src/main/java/models/Subscription.java @@ -0,0 +1,57 @@ +package models; + +import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.LongAdder; +import java.util.function.Function; +import java.util.function.Predicate; + +public class Subscription { + private final String topic; + private final String subscriber; + private final Predicate precondition; + private final Function> eventHandler; + private final int numberOfRetries; + private final LongAdder currentIndex; + + public Subscription(final String topic, + final String subscriber, + final Predicate precondition, + final Function> eventHandler, + final int numberOfRetries) { + this.topic = topic; + this.subscriber = subscriber; + this.precondition = precondition; + this.eventHandler = eventHandler; + this.currentIndex = new LongAdder(); + this.numberOfRetries = numberOfRetries; + } + + public String getTopic() { + return topic; + } + + public String getSubscriber() { + return subscriber; + } + + public Predicate getPrecondition() { + return precondition; + } + + public Function> getEventHandler() { + return eventHandler; + } + + public LongAdder getCurrentIndex() { + return currentIndex; + } + + public void setCurrentIndex(final int offset) { + currentIndex.reset(); + currentIndex.add(offset); + } + + public int getNumberOfRetries() { + return numberOfRetries; + } +} diff --git a/distributed-event-bus/src/main/java/util/Timer.java b/distributed-event-bus/src/main/java/util/Timer.java new file mode 100644 index 0000000..4e15d88 --- /dev/null +++ b/distributed-event-bus/src/main/java/util/Timer.java @@ -0,0 +1,10 @@ +package util; + +import com.google.inject.Singleton; + +@Singleton +public class Timer { + public long getCurrentTime() { + return System.nanoTime(); + } +} diff --git a/distributed-event-bus/src/main/resources/application.properties b/distributed-event-bus/src/main/resources/application.properties new file mode 100644 index 0000000..e69de29 diff --git a/distributed-event-bus/src/test/java/EventBusTest.java b/distributed-event-bus/src/test/java/EventBusTest.java new file mode 100644 index 0000000..45fccea --- /dev/null +++ b/distributed-event-bus/src/test/java/EventBusTest.java @@ -0,0 +1,340 @@ +import com.google.gson.Gson; +import exceptions.RetryLimitExceededException; +import exceptions.UnsubscribedPollException; +import lib.KeyedExecutor; +import models.Event; +import models.EventType; +import models.FailureEvent; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import util.Timer; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; + + +// Causal ordering of topics + +public class EventBusTest { + public static final String TOPIC_1 = "topic-1"; + public static final String TOPIC_2 = "topic-2"; + public static final String PUBLISHER_1 = "publisher-1"; + public static final String SUBSCRIBER_1 = "subscriber-1"; + public static final String SUBSCRIBER_2 = "subscriber-2"; + private Timer timer; + private KeyedExecutor keyedExecutor; + private KeyedExecutor broadcastExecutor; + + @Before + public void setUp() { + keyedExecutor = new KeyedExecutor<>(16); + broadcastExecutor = new KeyedExecutor<>(16); + timer = new Timer(); + } + + private Event constructEvent(EventType priority, String description) { + return new Event(PUBLISHER_1, priority, description, timer.getCurrentTime()); + } + + @Test + public void defaultBehavior() { + final EventBus eventBus = new EventBus(keyedExecutor, broadcastExecutor, timer); + eventBus.publish(TOPIC_1, constructEvent(EventType.LOGGING, "first event")); + eventBus.subscribeForPull(TOPIC_1, SUBSCRIBER_1, (event) -> true).toCompletableFuture().join(); + Assert.assertNull(eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join()); + + eventBus.publish(TOPIC_1, constructEvent(EventType.PRIORITY, "second event")); + final Event secondEvent = eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join(); + + Assert.assertEquals(EventType.PRIORITY, secondEvent.getEventType()); + Assert.assertEquals("second event", secondEvent.getDescription()); + + eventBus.subscribeToEventsAfter(TOPIC_1, SUBSCRIBER_1, null).toCompletableFuture().join(); + final Event firstEvent = eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join(); + + Assert.assertEquals(EventType.LOGGING, firstEvent.getEventType()); + Assert.assertEquals("first event", firstEvent.getDescription()); + Assert.assertEquals(PUBLISHER_1, firstEvent.getPublisher()); + + final List eventCollector = new ArrayList<>(); + eventBus.subscribeForPush(TOPIC_1, + SUBSCRIBER_2, + (event) -> true, + (event) -> CompletableFuture.runAsync(() -> eventCollector.add(event)), + 0).toCompletableFuture().join(); + eventBus.publish(TOPIC_1, constructEvent(EventType.ERROR, "third event")).toCompletableFuture().join(); + + Assert.assertEquals(EventType.ERROR, eventCollector.get(0).getEventType()); + Assert.assertEquals("third event", eventCollector.get(0).getDescription()); + + eventBus.unsubscribe(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join(); + eventBus.publish(TOPIC_1, constructEvent(EventType.LOGGING, "fourth event")).toCompletableFuture().join(); + Assert.assertTrue(eventBus.poll(TOPIC_1, SUBSCRIBER_1) + .handle((__, throwable) -> throwable.getCause() instanceof UnsubscribedPollException) + .toCompletableFuture().join()); + + eventCollector.clear(); + eventBus.unsubscribe(TOPIC_1, SUBSCRIBER_2).toCompletableFuture().join(); + eventBus.publish(TOPIC_1, constructEvent(EventType.LOGGING, "fifth event")).toCompletableFuture().join(); + Assert.assertTrue(eventCollector.isEmpty()); + } + + @Test + public void indexMove() { + final EventBus eventBus = new EventBus(keyedExecutor, broadcastExecutor, timer); + eventBus.subscribeForPull(TOPIC_1, SUBSCRIBER_1, (event) -> true).toCompletableFuture().join(); + final Event firstEvent = constructEvent(EventType.PRIORITY, "first event"); + final Event secondEvent = constructEvent(EventType.PRIORITY, "second event"); + final Event thirdEvent = constructEvent(EventType.PRIORITY, "third event"); + eventBus.publish(TOPIC_1, firstEvent).toCompletableFuture().join(); + eventBus.publish(TOPIC_1, secondEvent).toCompletableFuture().join(); + eventBus.publish(TOPIC_1, thirdEvent).toCompletableFuture().join(); + + eventBus.subscribeToEventsAfter(TOPIC_1, SUBSCRIBER_1, secondEvent.getId()).toCompletableFuture().join(); + final Event firstPoll = eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join(); + Assert.assertEquals("third event", firstPoll.getDescription()); + Assert.assertNull(eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join()); + + eventBus.subscribeToEventsAfter(TOPIC_1, SUBSCRIBER_1, null).toCompletableFuture().join(); + final Event secondPoll = eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join(); + Assert.assertEquals("first event", secondPoll.getDescription()); + + eventBus.subscribeToEventsAfter(TOPIC_1, SUBSCRIBER_1, firstEvent.getId()).toCompletableFuture().join(); + final Event thirdPoll = eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join(); + Assert.assertEquals("second event", thirdPoll.getDescription()); + + eventBus.subscribeToEventsAfter(TOPIC_1, SUBSCRIBER_1, thirdEvent.getId()).toCompletableFuture().join(); + Assert.assertNull(eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join()); + } + + @Test + public void timestampMove() { + final TestTimer timer = new TestTimer(); + final EventBus eventBus = new EventBus(keyedExecutor, broadcastExecutor, timer); + eventBus.subscribeForPull(TOPIC_1, SUBSCRIBER_1, (event) -> true).toCompletableFuture().join(); + + final Event firstEvent = new Event(PUBLISHER_1, EventType.PRIORITY, "first event", timer.getCurrentTime()); + eventBus.publish(TOPIC_1, firstEvent).toCompletableFuture().join(); + timer.setCurrentTime(timer.getCurrentTime() + Duration.ofSeconds(10).toNanos()); + + final Event secondEvent = new Event(PUBLISHER_1, EventType.PRIORITY, "second event", timer.getCurrentTime()); + eventBus.publish(TOPIC_1, secondEvent).toCompletableFuture().join(); + timer.setCurrentTime(timer.getCurrentTime() + Duration.ofSeconds(10).toNanos()); + + final Event thirdEvent = new Event(PUBLISHER_1, EventType.PRIORITY, "third event", timer.getCurrentTime()); + eventBus.publish(TOPIC_1, thirdEvent).toCompletableFuture().join(); + + eventBus.subscribeToEventsAfter(TOPIC_1, SUBSCRIBER_1, secondEvent.getCreationTime() + Duration.ofSeconds(5).toNanos()).toCompletableFuture().join(); + final Event firstPoll = eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join(); + Assert.assertEquals("third event", firstPoll.getDescription()); + Assert.assertNull(eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join()); + + eventBus.subscribeToEventsAfter(TOPIC_1, SUBSCRIBER_1, 0).toCompletableFuture().join(); + final Event secondPoll = eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join(); + Assert.assertEquals("first event", secondPoll.getDescription()); + + eventBus.subscribeToEventsAfter(TOPIC_1, SUBSCRIBER_1, firstEvent.getCreationTime() + Duration.ofSeconds(5).toNanos()).toCompletableFuture().join(); + final Event thirdPoll = eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join(); + Assert.assertEquals("second event", thirdPoll.getDescription()); + + eventBus.subscribeToEventsAfter(TOPIC_1, SUBSCRIBER_1, thirdEvent.getCreationTime() + Duration.ofNanos(1).toNanos()).toCompletableFuture().join(); + Assert.assertNull(eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join()); + } + + @Test + public void idempotency() { + final EventBus eventBus = new EventBus(keyedExecutor, broadcastExecutor, timer); + eventBus.subscribeForPull(TOPIC_1, SUBSCRIBER_1, (event) -> true).toCompletableFuture().join(); + Event event1 = new Gson().fromJson("{\n" + + " \"id\": \"event-5435\",\n" + + " \"publisher\": \"random-publisher-1\",\n" + + " \"eventType\": \"LOGGING\",\n" + + " \"description\": \"random-event-1\",\n" + + " \"creationTime\": 31884739810179363\n" + + "}", Event.class); + eventBus.publish(TOPIC_1, event1); + + Event event2 = new Gson().fromJson("{\n" + + " \"id\": \"event-5435\",\n" + + " \"publisher\": \"random-publisher-2\",\n" + + " \"eventType\": \"PRIORITY\",\n" + + " \"description\": \"random-event-2\",\n" + + " \"creationTime\": 31824735510179363\n" + + "}", Event.class); + eventBus.publish(TOPIC_1, event2); + + + final Event firstEvent = eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join(); + Assert.assertEquals(EventType.LOGGING, firstEvent.getEventType()); + Assert.assertEquals("random-event-1", firstEvent.getDescription()); + Assert.assertNull(eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join()); + } + + @Test + public void unsubscribePushEvents() { + final EventBus eventBus = new EventBus(keyedExecutor, broadcastExecutor, timer); + final List topic1 = new ArrayList<>(), topic2 = new ArrayList<>(); + eventBus.subscribeForPush(TOPIC_1, SUBSCRIBER_1, event -> true, event -> { + topic1.add(event); + return CompletableFuture.completedStage(null); + }, 0).toCompletableFuture().join(); + eventBus.subscribeForPush(TOPIC_2, SUBSCRIBER_1, event -> true, event -> { + topic2.add(event); + return CompletableFuture.completedStage(null); + }, 0).toCompletableFuture().join(); + + for (int i = 0; i < 3; i++) { + eventBus.publish(TOPIC_1, constructEvent(EventType.PRIORITY, UUID.randomUUID().toString())).toCompletableFuture().join(); + } + eventBus.publish(TOPIC_2, constructEvent(EventType.PRIORITY, UUID.randomUUID().toString())).toCompletableFuture().join(); + eventBus.unsubscribe(TOPIC_1, SUBSCRIBER_1); + Assert.assertEquals(3, topic1.size()); + Assert.assertEquals(1, topic2.size()); + + for (int i = 0; i < 2; i++) { + eventBus.publish(TOPIC_2, constructEvent(EventType.PRIORITY, UUID.randomUUID().toString())).toCompletableFuture().join(); + } + for (int i = 0; i < 3; i++) { + eventBus.publish(TOPIC_1, constructEvent(EventType.PRIORITY, UUID.randomUUID().toString())).toCompletableFuture().join(); + } + Assert.assertEquals(3, topic1.size()); + Assert.assertEquals(3, topic2.size()); + + eventBus.subscribeForPush(TOPIC_1, SUBSCRIBER_1, event -> true, event -> { + topic1.add(event); + return CompletableFuture.completedStage(null); + }, 0).toCompletableFuture().join(); + for (int i = 0; i < 3; i++) { + eventBus.publish(TOPIC_1, constructEvent(EventType.PRIORITY, UUID.randomUUID().toString())).toCompletableFuture().join(); + } + Assert.assertEquals(6, topic1.size()); + Assert.assertEquals(3, topic2.size()); + } + + @Test + public void unsubscribePullEvents() { + final EventBus eventBus = new EventBus(keyedExecutor, broadcastExecutor, timer); + eventBus.subscribeForPull(TOPIC_1, SUBSCRIBER_1, event -> true).toCompletableFuture().join(); + eventBus.subscribeForPull(TOPIC_2, SUBSCRIBER_1, event -> true).toCompletableFuture().join(); + for (int i = 0; i < 3; i++) { + eventBus.publish(TOPIC_1, constructEvent(EventType.PRIORITY, UUID.randomUUID().toString())).toCompletableFuture().join(); + } + eventBus.publish(TOPIC_2, constructEvent(EventType.PRIORITY, UUID.randomUUID().toString())).toCompletableFuture().join(); + + for (int i = 0; i < 3; i++) { + Assert.assertNotNull(eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join()); + } + Assert.assertNotNull(eventBus.poll(TOPIC_2, SUBSCRIBER_1).toCompletableFuture().join()); + + eventBus.unsubscribe(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join(); + for (int i = 0; i < 2; i++) { + eventBus.publish(TOPIC_2, constructEvent(EventType.PRIORITY, UUID.randomUUID().toString())).toCompletableFuture().join(); + } + for (int i = 0; i < 3; i++) { + eventBus.publish(TOPIC_1, constructEvent(EventType.PRIORITY, UUID.randomUUID().toString())).toCompletableFuture().join(); + } + + Assert.assertTrue(eventBus.poll(TOPIC_1, SUBSCRIBER_1) + .handle((__, throwable) -> throwable.getCause() instanceof UnsubscribedPollException).toCompletableFuture().join()); + for (int i = 0; i < 2; i++) { + Assert.assertNotNull(eventBus.poll(TOPIC_2, SUBSCRIBER_1).toCompletableFuture().join()); + } + + eventBus.subscribeForPull(TOPIC_1, SUBSCRIBER_1, event -> true).toCompletableFuture().join(); + for (int i = 0; i < 3; i++) { + eventBus.publish(TOPIC_1, constructEvent(EventType.PRIORITY, UUID.randomUUID().toString())).toCompletableFuture().join(); + } + + for (int i = 0; i < 3; i++) { + Assert.assertNotNull(eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join()); + } + Assert.assertNull(eventBus.poll(TOPIC_2, SUBSCRIBER_1).toCompletableFuture().join()); + } + + @Test + public void deadLetterQueue() { + final EventBus eventBus = new EventBus(keyedExecutor, broadcastExecutor, timer); + final EventBus dlq = new EventBus(new KeyedExecutor<>(3), new KeyedExecutor<>(3), new Timer()); + eventBus.setDeadLetterQueue(dlq); + dlq.subscribeForPull(TOPIC_1, SUBSCRIBER_1, event -> event.getEventType().equals(EventType.ERROR)); + final AtomicLong attempts = new AtomicLong(); + final int maxTries = 5; + eventBus.subscribeForPush(TOPIC_1, SUBSCRIBER_1, event -> true, event -> { + attempts.incrementAndGet(); + return CompletableFuture.failedStage(new RuntimeException()); + }, maxTries).toCompletableFuture().join(); + final Event event = new Event(PUBLISHER_1, EventType.LOGGING, "random", timer.getCurrentTime()); + eventBus.publish(TOPIC_1, event).toCompletableFuture().join(); + Assert.assertEquals(5, attempts.intValue()); + final Event failureEvent = dlq.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join(); + Assert.assertTrue(failureEvent instanceof FailureEvent); + Assert.assertEquals(event.getId(), ((FailureEvent) failureEvent).getEvent().getId()); + Assert.assertEquals(EventType.ERROR, failureEvent.getEventType()); + Assert.assertTrue(((FailureEvent) failureEvent).getThrowable().getCause() instanceof RetryLimitExceededException); + } + + @Test + public void retrySuccess() { + final EventBus eventBus = new EventBus(keyedExecutor, broadcastExecutor, timer); + final AtomicLong attempts = new AtomicLong(); + final int maxTries = 5; + final List events = new ArrayList<>(); + eventBus.subscribeForPush(TOPIC_1, SUBSCRIBER_1, event -> true, event -> { + if (attempts.incrementAndGet() == maxTries) { + events.add(event); + return CompletableFuture.completedStage(null); + } else { + return CompletableFuture.failedStage(new RuntimeException("TRY no: " + attempts.intValue())); + } + }, maxTries).toCompletableFuture().join(); + eventBus.publish(TOPIC_1, new Event(PUBLISHER_1, EventType.LOGGING, "random", timer.getCurrentTime())).toCompletableFuture().join(); + + Assert.assertEquals(EventType.LOGGING, events.get(0).getEventType()); + Assert.assertEquals("random", events.get(0).getDescription()); + Assert.assertEquals(5, attempts.intValue()); + Assert.assertEquals(1, events.size()); + } + + @Test + public void preconditionCheckForPush() { + final EventBus eventBus = new EventBus(keyedExecutor, broadcastExecutor, timer); + final List events = new ArrayList<>(); + eventBus.subscribeForPush(TOPIC_1, SUBSCRIBER_1, event -> event.getDescription().contains("-1"), event -> { + events.add(event); + return CompletableFuture.completedStage(null); + }, 0).toCompletableFuture().join(); + eventBus.publish(TOPIC_1, new Event(PUBLISHER_1, EventType.LOGGING, "random-event-1", timer.getCurrentTime())).toCompletableFuture().join(); + eventBus.publish(TOPIC_1, new Event(PUBLISHER_1, EventType.LOGGING, "random-event-2", timer.getCurrentTime())).toCompletableFuture().join(); + eventBus.publish(TOPIC_1, new Event(PUBLISHER_1, EventType.LOGGING, "random-event-12", timer.getCurrentTime())).toCompletableFuture().join(); + eventBus.publish(TOPIC_1, new Event(PUBLISHER_1, EventType.LOGGING, "random-event-21", timer.getCurrentTime())).toCompletableFuture().join(); + + Assert.assertEquals(events.size(), 2); + Assert.assertEquals(EventType.LOGGING, events.get(0).getEventType()); + Assert.assertEquals("random-event-1", events.get(0).getDescription()); + Assert.assertEquals(EventType.LOGGING, events.get(1).getEventType()); + Assert.assertEquals("random-event-12", events.get(1).getDescription()); + } + + @Test + public void preconditionCheckForPull() { + final EventBus eventBus = new EventBus(keyedExecutor, broadcastExecutor, timer); + eventBus.subscribeForPull(TOPIC_1, SUBSCRIBER_1, event -> event.getDescription().contains("-1")).toCompletableFuture().join(); + eventBus.publish(TOPIC_1, new Event(PUBLISHER_1, EventType.LOGGING, "random-event-1", timer.getCurrentTime())).toCompletableFuture().join(); + eventBus.publish(TOPIC_1, new Event(PUBLISHER_1, EventType.LOGGING, "random-event-2", timer.getCurrentTime())).toCompletableFuture().join(); + eventBus.publish(TOPIC_1, new Event(PUBLISHER_1, EventType.LOGGING, "random-event-12", timer.getCurrentTime())).toCompletableFuture().join(); + eventBus.publish(TOPIC_1, new Event(PUBLISHER_1, EventType.LOGGING, "random-event-21", timer.getCurrentTime())).toCompletableFuture().join(); + + final Event event1 = eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join(); + Assert.assertEquals(EventType.LOGGING, event1.getEventType()); + Assert.assertEquals("random-event-1", event1.getDescription()); + final Event event2 = eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join(); + Assert.assertEquals(EventType.LOGGING, event2.getEventType()); + Assert.assertEquals("random-event-12", event2.getDescription()); + Assert.assertNull(eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join()); + } +} diff --git a/distributed-event-bus/src/test/java/TestTimer.java b/distributed-event-bus/src/test/java/TestTimer.java new file mode 100644 index 0000000..b6d818f --- /dev/null +++ b/distributed-event-bus/src/test/java/TestTimer.java @@ -0,0 +1,18 @@ +import util.Timer; + +public class TestTimer extends Timer { + private long currentTime; + + public TestTimer() { + this.currentTime = System.nanoTime(); + } + + @Override + public long getCurrentTime() { + return currentTime; + } + + public void setCurrentTime(final long currentTime) { + this.currentTime = currentTime; + } +} diff --git a/distributed-event-bus/target/classes/EventBus.class b/distributed-event-bus/target/classes/EventBus.class new file mode 100644 index 0000000..94f6ec7 Binary files /dev/null and b/distributed-event-bus/target/classes/EventBus.class differ diff --git a/distributed-event-bus/target/classes/META-INF/distributed-event-bus.kotlin_module b/distributed-event-bus/target/classes/META-INF/distributed-event-bus.kotlin_module new file mode 100644 index 0000000..a49347a Binary files /dev/null and b/distributed-event-bus/target/classes/META-INF/distributed-event-bus.kotlin_module differ diff --git a/distributed-event-bus/target/classes/application.properties b/distributed-event-bus/target/classes/application.properties new file mode 100644 index 0000000..e69de29 diff --git a/distributed-event-bus/target/classes/exceptions/RetryLimitExceededException.class b/distributed-event-bus/target/classes/exceptions/RetryLimitExceededException.class new file mode 100644 index 0000000..330a2c8 Binary files /dev/null and b/distributed-event-bus/target/classes/exceptions/RetryLimitExceededException.class differ diff --git a/distributed-event-bus/target/classes/exceptions/UnsubscribedPollException.class b/distributed-event-bus/target/classes/exceptions/UnsubscribedPollException.class new file mode 100644 index 0000000..895393f Binary files /dev/null and b/distributed-event-bus/target/classes/exceptions/UnsubscribedPollException.class differ diff --git a/distributed-event-bus/target/classes/lib/KeyedExecutor.class b/distributed-event-bus/target/classes/lib/KeyedExecutor.class new file mode 100644 index 0000000..aedf966 Binary files /dev/null and b/distributed-event-bus/target/classes/lib/KeyedExecutor.class differ diff --git a/distributed-event-bus/target/classes/models/Event.class b/distributed-event-bus/target/classes/models/Event.class new file mode 100644 index 0000000..93516c1 Binary files /dev/null and b/distributed-event-bus/target/classes/models/Event.class differ diff --git a/distributed-event-bus/target/classes/models/EventType.class b/distributed-event-bus/target/classes/models/EventType.class new file mode 100644 index 0000000..4b86a61 Binary files /dev/null and b/distributed-event-bus/target/classes/models/EventType.class differ diff --git a/distributed-event-bus/target/classes/models/FailureEvent.class b/distributed-event-bus/target/classes/models/FailureEvent.class new file mode 100644 index 0000000..0911ae2 Binary files /dev/null and b/distributed-event-bus/target/classes/models/FailureEvent.class differ diff --git a/distributed-event-bus/target/classes/models/Subscription.class b/distributed-event-bus/target/classes/models/Subscription.class new file mode 100644 index 0000000..49dd2c0 Binary files /dev/null and b/distributed-event-bus/target/classes/models/Subscription.class differ diff --git a/distributed-event-bus/target/classes/util/Timer.class b/distributed-event-bus/target/classes/util/Timer.class new file mode 100644 index 0000000..40f6d1a Binary files /dev/null and b/distributed-event-bus/target/classes/util/Timer.class differ diff --git a/distributed-event-bus/target/test-classes/EventBusTest.class b/distributed-event-bus/target/test-classes/EventBusTest.class new file mode 100644 index 0000000..61143cb Binary files /dev/null and b/distributed-event-bus/target/test-classes/EventBusTest.class differ diff --git a/distributed-event-bus/target/test-classes/TestTimer.class b/distributed-event-bus/target/test-classes/TestTimer.class new file mode 100644 index 0000000..3ca66a5 Binary files /dev/null and b/distributed-event-bus/target/test-classes/TestTimer.class differ diff --git a/rate-limiter/.idea/.gitignore b/rate-limiter/.idea/.gitignore new file mode 100644 index 0000000..26d3352 --- /dev/null +++ b/rate-limiter/.idea/.gitignore @@ -0,0 +1,3 @@ +# Default ignored files +/shelf/ +/workspace.xml diff --git a/rate-limiter/.idea/compiler.xml b/rate-limiter/.idea/compiler.xml new file mode 100644 index 0000000..e7c7cd8 --- /dev/null +++ b/rate-limiter/.idea/compiler.xml @@ -0,0 +1,16 @@ + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/rate-limiter/.idea/jarRepositories.xml b/rate-limiter/.idea/jarRepositories.xml new file mode 100644 index 0000000..712ab9d --- /dev/null +++ b/rate-limiter/.idea/jarRepositories.xml @@ -0,0 +1,20 @@ + + + + + + + + + + + \ No newline at end of file diff --git a/rate-limiter/.idea/libraries/Maven__junit_junit_4_13.xml b/rate-limiter/.idea/libraries/Maven__junit_junit_4_13.xml new file mode 100644 index 0000000..59fc5c4 --- /dev/null +++ b/rate-limiter/.idea/libraries/Maven__junit_junit_4_13.xml @@ -0,0 +1,13 @@ + + + + + + + + + + + + + \ No newline at end of file diff --git a/rate-limiter/.idea/libraries/Maven__org_hamcrest_hamcrest_core_1_3.xml b/rate-limiter/.idea/libraries/Maven__org_hamcrest_hamcrest_core_1_3.xml new file mode 100644 index 0000000..f58bbc1 --- /dev/null +++ b/rate-limiter/.idea/libraries/Maven__org_hamcrest_hamcrest_core_1_3.xml @@ -0,0 +1,13 @@ + + + + + + + + + + + + + \ No newline at end of file diff --git a/rate-limiter/.idea/misc.xml b/rate-limiter/.idea/misc.xml new file mode 100644 index 0000000..2e289ef --- /dev/null +++ b/rate-limiter/.idea/misc.xml @@ -0,0 +1,13 @@ + + + + + + + + + \ No newline at end of file diff --git a/rate-limiter/.idea/modules.xml b/rate-limiter/.idea/modules.xml new file mode 100644 index 0000000..9fd586a --- /dev/null +++ b/rate-limiter/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/rate-limiter/.idea/uiDesigner.xml b/rate-limiter/.idea/uiDesigner.xml new file mode 100644 index 0000000..e96534f --- /dev/null +++ b/rate-limiter/.idea/uiDesigner.xml @@ -0,0 +1,124 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/rate-limiter/.idea/vcs.xml b/rate-limiter/.idea/vcs.xml new file mode 100644 index 0000000..6c0b863 --- /dev/null +++ b/rate-limiter/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/rate-limiter/pom.xml b/rate-limiter/pom.xml new file mode 100644 index 0000000..af18a0d --- /dev/null +++ b/rate-limiter/pom.xml @@ -0,0 +1,31 @@ + + + 4.0.0 + + interviewready.io + rate-limiter + 1.0 + + + + org.apache.maven.plugins + maven-compiler-plugin + + 10 + 10 + + + + + + + + junit + junit + 4.13 + test + + + \ No newline at end of file diff --git a/rate-limiter/rate-limiter.iml b/rate-limiter/rate-limiter.iml new file mode 100644 index 0000000..da9b2a5 --- /dev/null +++ b/rate-limiter/rate-limiter.iml @@ -0,0 +1,17 @@ + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/rate-limiter/src/main/java/TimerWheel.java b/rate-limiter/src/main/java/TimerWheel.java new file mode 100644 index 0000000..735b924 --- /dev/null +++ b/rate-limiter/src/main/java/TimerWheel.java @@ -0,0 +1,77 @@ +import exceptions.RateLimitExceededException; +import models.Request; +import utils.Timer; + +import java.util.Map; +import java.util.concurrent.*; + +public class TimerWheel { + private final int timeOutPeriod; + private final int capacityPerSlot; + private final TimeUnit timeUnit; + private final ArrayBlockingQueue[] slots; + private final Map reverseIndex; + private final Timer timer; + private final ExecutorService[] threads; + + public TimerWheel(final TimeUnit timeUnit, + final int timeOutPeriod, + final int capacityPerSlot, + final Timer timer) { + this.timeUnit = timeUnit; + this.timeOutPeriod = timeOutPeriod; + this.capacityPerSlot = capacityPerSlot; + if (this.timeOutPeriod > 1000) { + throw new IllegalArgumentException(); + } + this.slots = new ArrayBlockingQueue[this.timeOutPeriod]; + this.threads = new ExecutorService[this.timeOutPeriod]; + this.reverseIndex = new ConcurrentHashMap<>(); + for (int i = 0; i < slots.length; i++) { + slots[i] = new ArrayBlockingQueue<>(capacityPerSlot); + threads[i] = Executors.newSingleThreadExecutor(); + } + this.timer = timer; + final long timePerSlot = TimeUnit.MILLISECONDS.convert(1, timeUnit); + Executors.newSingleThreadScheduledExecutor() + .scheduleAtFixedRate(this::flushRequests, + timePerSlot - (this.timer.getCurrentTimeInMillis() % timePerSlot), + timePerSlot, TimeUnit.MILLISECONDS); + } + + public Future flushRequests() { + final int currentSlot = getCurrentSlot(); + return threads[currentSlot].submit(() -> { + for (final Request request : slots[currentSlot]) { + if (timer.getCurrentTime(timeUnit) - request.getStartTime() >= timeOutPeriod) { + slots[currentSlot].remove(request); + reverseIndex.remove(request.getRequestId()); + } + } + }); + } + + public Future addRequest(final Request request) { + final int currentSlot = getCurrentSlot(); + return threads[currentSlot].submit(() -> { + if (slots[currentSlot].size() >= capacityPerSlot) { + throw new RateLimitExceededException(); + } + slots[currentSlot].add(request); + reverseIndex.put(request.getRequestId(), currentSlot); + }); + } + + public Future evict(final String requestId) { + final int currentSlot = reverseIndex.get(requestId); + return threads[currentSlot].submit(() -> { + slots[currentSlot].remove(new Request(requestId, 0)); + reverseIndex.remove(requestId); + }); + } + + private int getCurrentSlot() { + return (int) timer.getCurrentTime(timeUnit) % slots.length; + } +} + diff --git a/rate-limiter/src/main/java/exceptions/RateLimitExceededException.java b/rate-limiter/src/main/java/exceptions/RateLimitExceededException.java new file mode 100644 index 0000000..3c82f38 --- /dev/null +++ b/rate-limiter/src/main/java/exceptions/RateLimitExceededException.java @@ -0,0 +1,7 @@ +package exceptions; + +public class RateLimitExceededException extends IllegalStateException { + public RateLimitExceededException() { + super("Rate limit exceeded"); + } +} diff --git a/rate-limiter/src/main/java/models/Request.java b/rate-limiter/src/main/java/models/Request.java new file mode 100644 index 0000000..467dde9 --- /dev/null +++ b/rate-limiter/src/main/java/models/Request.java @@ -0,0 +1,33 @@ +package models; + +import java.util.Objects; + +public class Request { + private final String requestId; + private final long startTime; + + public Request(String requestId, long startTime) { + this.requestId = requestId; + this.startTime = startTime; + } + + public String getRequestId() { + return requestId; + } + + public long getStartTime() { + return startTime; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + return requestId.equals(((Request) o).requestId); + } + + @Override + public int hashCode() { + return requestId.hashCode(); + } +} diff --git a/rate-limiter/src/main/java/utils/Timer.java b/rate-limiter/src/main/java/utils/Timer.java new file mode 100644 index 0000000..4545d91 --- /dev/null +++ b/rate-limiter/src/main/java/utils/Timer.java @@ -0,0 +1,13 @@ +package utils; + +import java.util.concurrent.TimeUnit; + +public class Timer { + public long getCurrentTime(final TimeUnit timeUnit) { + return timeUnit.convert(getCurrentTimeInMillis(), TimeUnit.MILLISECONDS); + } + + public long getCurrentTimeInMillis() { + return System.currentTimeMillis(); + } +} diff --git a/rate-limiter/src/test/java/RateLimitTest.java b/rate-limiter/src/test/java/RateLimitTest.java new file mode 100644 index 0000000..8cf3098 --- /dev/null +++ b/rate-limiter/src/test/java/RateLimitTest.java @@ -0,0 +1,74 @@ +import models.Request; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +public class RateLimitTest { + + @Test + public void testDefaultBehaviour() throws Exception { + final TimeUnit timeUnit = TimeUnit.SECONDS; + final TestTimer timer = new TestTimer(); + final TimerWheel timerWheel = new TimerWheel(timeUnit, 6, 3, timer); + timerWheel.addRequest(new Request("1", timer.getCurrentTime(timeUnit))).get(); + timerWheel.addRequest(new Request("2", timer.getCurrentTime(timeUnit))).get(); + timerWheel.addRequest(new Request("3", timer.getCurrentTime(timeUnit))).get(); + Throwable exception = null; + try { + timerWheel.addRequest(new Request("4", timer.getCurrentTime(timeUnit))).get(); + } catch (Exception e) { + exception = e.getCause(); + } + Assert.assertNotNull(exception); + Assert.assertEquals("Rate limit exceeded", exception.getMessage()); + tick(timeUnit, timer, timerWheel); + timerWheel.addRequest(new Request("4", timer.getCurrentTime(timeUnit))).get(); + timerWheel.addRequest(new Request("5", timer.getCurrentTime(timeUnit))).get(); + timerWheel.evict("1").get(); + timerWheel.evict("4").get(); + timerWheel.addRequest(new Request("6", timer.getCurrentTime(timeUnit))).get(); + timerWheel.addRequest(new Request("7", timer.getCurrentTime(timeUnit))).get(); + } + + @Test + public void testClearing() throws Exception { + final TimeUnit timeUnit = TimeUnit.SECONDS; + final TestTimer timer = new TestTimer(); + final int timeOutPeriod = 6; + final TimerWheel timerWheel = new TimerWheel(timeUnit, timeOutPeriod, 3, timer); + timerWheel.addRequest(new Request("0", timer.getCurrentTime(timeUnit))).get(); + timerWheel.addRequest(new Request("1", timer.getCurrentTime(timeUnit))).get(); + timerWheel.addRequest(new Request("2", timer.getCurrentTime(timeUnit))).get(); + + Throwable exception = null; + try { + timerWheel.addRequest(new Request("3", timer.getCurrentTime(timeUnit))).get(); + } catch (Exception e) { + exception = e.getCause(); + } + Assert.assertNotNull(exception); + Assert.assertEquals("Rate limit exceeded", exception.getMessage()); + + for (int i = 0; i < timeOutPeriod; i++) { + tick(timeUnit, timer, timerWheel); + } + timerWheel.addRequest(new Request("4", timer.getCurrentTime(timeUnit))).get(); + timerWheel.addRequest(new Request("5", timer.getCurrentTime(timeUnit))).get(); + timerWheel.addRequest(new Request("6", timer.getCurrentTime(timeUnit))).get(); + + exception = null; + try { + timerWheel.addRequest(new Request("7", timer.getCurrentTime(timeUnit))).get(); + } catch (Exception e) { + exception = e.getCause(); + } + Assert.assertNotNull(exception); + Assert.assertEquals("Rate limit exceeded", exception.getMessage()); + } + + private void tick(TimeUnit timeUnit, TestTimer timer, TimerWheel timerWheel) throws Exception { + timer.setTime(timer.getCurrentTimeInMillis() + TimeUnit.MILLISECONDS.convert(1, timeUnit)); + timerWheel.flushRequests().get(); + } +} diff --git a/rate-limiter/src/test/java/TestTimer.java b/rate-limiter/src/test/java/TestTimer.java new file mode 100644 index 0000000..7c4cacf --- /dev/null +++ b/rate-limiter/src/test/java/TestTimer.java @@ -0,0 +1,14 @@ +import utils.Timer; + +public class TestTimer extends Timer { + private long currentTime = System.currentTimeMillis(); + + @Override + public long getCurrentTimeInMillis() { + return currentTime; + } + + public void setTime(final long currentTime) { + this.currentTime = currentTime; + } +} diff --git a/rate-limiter/target/classes/TimerWheel.class b/rate-limiter/target/classes/TimerWheel.class new file mode 100644 index 0000000..af03599 Binary files /dev/null and b/rate-limiter/target/classes/TimerWheel.class differ diff --git a/rate-limiter/target/classes/exceptions/RateLimitExceededException.class b/rate-limiter/target/classes/exceptions/RateLimitExceededException.class new file mode 100644 index 0000000..a53f121 Binary files /dev/null and b/rate-limiter/target/classes/exceptions/RateLimitExceededException.class differ diff --git a/rate-limiter/target/classes/models/Request.class b/rate-limiter/target/classes/models/Request.class new file mode 100644 index 0000000..2ddd366 Binary files /dev/null and b/rate-limiter/target/classes/models/Request.class differ diff --git a/rate-limiter/target/classes/utils/Timer.class b/rate-limiter/target/classes/utils/Timer.class new file mode 100644 index 0000000..e081937 Binary files /dev/null and b/rate-limiter/target/classes/utils/Timer.class differ diff --git a/rate-limiter/target/test-classes/RateLimitTest.class b/rate-limiter/target/test-classes/RateLimitTest.class new file mode 100644 index 0000000..1b8b30b Binary files /dev/null and b/rate-limiter/target/test-classes/RateLimitTest.class differ diff --git a/rate-limiter/target/test-classes/TestTimer.class b/rate-limiter/target/test-classes/TestTimer.class new file mode 100644 index 0000000..e0c143f Binary files /dev/null and b/rate-limiter/target/test-classes/TestTimer.class differ diff --git a/service-orchestrator/.idea/.gitignore b/service-orchestrator/.idea/.gitignore new file mode 100644 index 0000000..26d3352 --- /dev/null +++ b/service-orchestrator/.idea/.gitignore @@ -0,0 +1,3 @@ +# Default ignored files +/shelf/ +/workspace.xml diff --git a/service-orchestrator/.idea/compiler.xml b/service-orchestrator/.idea/compiler.xml new file mode 100644 index 0000000..a50050e --- /dev/null +++ b/service-orchestrator/.idea/compiler.xml @@ -0,0 +1,13 @@ + + + + + + + + + + + + + \ No newline at end of file diff --git a/service-orchestrator/.idea/jarRepositories.xml b/service-orchestrator/.idea/jarRepositories.xml new file mode 100644 index 0000000..712ab9d --- /dev/null +++ b/service-orchestrator/.idea/jarRepositories.xml @@ -0,0 +1,20 @@ + + + + + + + + + + + \ No newline at end of file diff --git a/service-orchestrator/.idea/misc.xml b/service-orchestrator/.idea/misc.xml new file mode 100644 index 0000000..d24ea8e --- /dev/null +++ b/service-orchestrator/.idea/misc.xml @@ -0,0 +1,14 @@ + + + + + + + + + + \ No newline at end of file diff --git a/service-orchestrator/.idea/uiDesigner.xml b/service-orchestrator/.idea/uiDesigner.xml new file mode 100644 index 0000000..e96534f --- /dev/null +++ b/service-orchestrator/.idea/uiDesigner.xml @@ -0,0 +1,124 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/service-orchestrator/.idea/vcs.xml b/service-orchestrator/.idea/vcs.xml new file mode 100644 index 0000000..6c0b863 --- /dev/null +++ b/service-orchestrator/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/service-orchestrator/pom.xml b/service-orchestrator/pom.xml new file mode 100644 index 0000000..a139c76 --- /dev/null +++ b/service-orchestrator/pom.xml @@ -0,0 +1,31 @@ + + + 4.0.0 + + org.example + service-orchestrator + 1.0 + + + + org.apache.maven.plugins + maven-compiler-plugin + + 11 + 11 + + + + + + + + junit + junit + 4.13 + test + + + \ No newline at end of file diff --git a/service-orchestrator/service-orchestrator.iml b/service-orchestrator/service-orchestrator.iml new file mode 100644 index 0000000..78b2cc5 --- /dev/null +++ b/service-orchestrator/service-orchestrator.iml @@ -0,0 +1,2 @@ + + \ No newline at end of file diff --git a/service-orchestrator/src/main/java/LoadBalancer.java b/service-orchestrator/src/main/java/LoadBalancer.java new file mode 100644 index 0000000..c885ca1 --- /dev/null +++ b/service-orchestrator/src/main/java/LoadBalancer.java @@ -0,0 +1,33 @@ +import models.Node; +import models.Request; +import models.Service; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class LoadBalancer { + private final Map services; + private final Map nodes; + + public LoadBalancer() { + this.services = new ConcurrentHashMap<>(); + this.nodes = new ConcurrentHashMap<>(); + } + + public void register(Service service) { + services.put(service.getId(), service); + } + + public void addNode(String serviceId, Node node) { + nodes.put(node.getId(), node); + services.get(serviceId).getRouter().addNode(node); + } + + public void removeNode(String serviceId, String nodeId) { + services.get(serviceId).getRouter().removeNode(nodes.remove(nodeId)); + } + + public Node getHandler(Request request) { + return services.get(request.getServiceId()).getRouter().getAssignedNode(request); + } +} diff --git a/service-orchestrator/src/main/java/algorithms/ConsistentHashing.java b/service-orchestrator/src/main/java/algorithms/ConsistentHashing.java new file mode 100644 index 0000000..9ddd4c3 --- /dev/null +++ b/service-orchestrator/src/main/java/algorithms/ConsistentHashing.java @@ -0,0 +1,57 @@ +package algorithms; + +import models.Node; +import models.Request; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Function; + +public class ConsistentHashing implements Router { + private final Map> nodePositions; + private final ConcurrentSkipListMap nodeMappings; + private final Function hashFunction; + private final int pointMultiplier; + + + public ConsistentHashing(final Function hashFunction, + final int pointMultiplier) { + if (pointMultiplier == 0) { + throw new IllegalArgumentException(); + } + this.pointMultiplier = pointMultiplier; + this.hashFunction = hashFunction; + this.nodePositions = new ConcurrentHashMap<>(); + this.nodeMappings = new ConcurrentSkipListMap<>(); + } + + public void addNode(Node node) { + nodePositions.put(node, new CopyOnWriteArrayList<>()); + for (int i = 0; i < pointMultiplier; i++) { + for (int j = 0; j < node.getWeight(); j++) { + final var point = hashFunction.apply((i * pointMultiplier + j) + node.getId()); + nodePositions.get(node).add(point); + nodeMappings.put(point, node); + } + } + } + + public void removeNode(Node node) { + for (final Long point : nodePositions.remove(node)) { + nodeMappings.remove(point); + } + } + + public Node getAssignedNode(Request request) { + final var key = hashFunction.apply(request.getId()); + final var entry = nodeMappings.higherEntry(key); + if (entry == null) { + return nodeMappings.firstEntry().getValue(); + } else { + return entry.getValue(); + } + } +} diff --git a/service-orchestrator/src/main/java/algorithms/Router.java b/service-orchestrator/src/main/java/algorithms/Router.java new file mode 100644 index 0000000..7b300a5 --- /dev/null +++ b/service-orchestrator/src/main/java/algorithms/Router.java @@ -0,0 +1,12 @@ +package algorithms; + +import models.Node; +import models.Request; + +public interface Router { + void addNode(Node node); + + void removeNode(Node node); + + Node getAssignedNode(Request request); +} diff --git a/service-orchestrator/src/main/java/algorithms/WeightedRoundRobin.java b/service-orchestrator/src/main/java/algorithms/WeightedRoundRobin.java new file mode 100644 index 0000000..e8919e3 --- /dev/null +++ b/service-orchestrator/src/main/java/algorithms/WeightedRoundRobin.java @@ -0,0 +1,47 @@ +package algorithms; + +import models.Node; +import models.Request; + +import java.util.ArrayList; +import java.util.List; + +public class WeightedRoundRobin implements Router { + private final List nodes; + private int assignTo; + private int currentNodeAssignments; + private final Object lock; + + public WeightedRoundRobin() { + this.nodes = new ArrayList<>(); + this.assignTo = 0; + this.lock = new Object(); + } + + public void addNode(Node node) { + synchronized (this.lock) { + nodes.add(node); + } + } + + public void removeNode(Node node) { + synchronized (this.lock) { + nodes.remove(node); + assignTo--; + currentNodeAssignments = 0; + } + } + + public Node getAssignedNode(Request request) { + synchronized (this.lock) { + assignTo = assignTo % nodes.size(); + final var currentNode = nodes.get(assignTo); + currentNodeAssignments++; + if (currentNodeAssignments == currentNode.getWeight()) { + assignTo++; + currentNodeAssignments = 0; + } + return currentNode; + } + } +} diff --git a/service-orchestrator/src/main/java/models/Node.java b/service-orchestrator/src/main/java/models/Node.java new file mode 100644 index 0000000..8231120 --- /dev/null +++ b/service-orchestrator/src/main/java/models/Node.java @@ -0,0 +1,45 @@ +package models; + +import java.util.Objects; + +public class Node { + private final String id; + private final int weight; + private final String ipAddress; + + public Node(String id, String ipAddress) { + this(id, ipAddress, 1); + } + + public Node(String id, String ipAddress, int weight) { + this.id = id; + this.weight = weight; + this.ipAddress = ipAddress; + } + + public String getId() { + return id; + } + + public int getWeight() { + return weight; + } + + public String getIpAddress() { + return ipAddress; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Node node = (Node) o; + return id.equals(node.id); + } + + @Override + public int hashCode() { + return Objects.hash(id); + } + +} diff --git a/service-orchestrator/src/main/java/models/Request.java b/service-orchestrator/src/main/java/models/Request.java new file mode 100644 index 0000000..c59b1bd --- /dev/null +++ b/service-orchestrator/src/main/java/models/Request.java @@ -0,0 +1,26 @@ +package models; + +public class Request { + private final String id; + + private final String serviceId; + private final String method; + + public Request(String id, String serviceId, String method) { + this.id = id; + this.serviceId = serviceId; + this.method = method; + } + + public String getId() { + return id; + } + + public String getServiceId() { + return serviceId; + } + + public String getMethod() { + return method; + } +} diff --git a/service-orchestrator/src/main/java/models/Service.java b/service-orchestrator/src/main/java/models/Service.java new file mode 100644 index 0000000..f42bef0 --- /dev/null +++ b/service-orchestrator/src/main/java/models/Service.java @@ -0,0 +1,27 @@ +package models; + +import algorithms.Router; + +public class Service { + private final Router router; + private final String id; + private final String[] methods; + + public Service(String id, Router router, String[] methods) { + this.router = router; + this.id = id; + this.methods = methods; + } + + public Router getRouter() { + return router; + } + + public String getId() { + return id; + } + + public String[] getMethods() { + return methods; + } +} diff --git a/service-orchestrator/src/test/java/LBTester.java b/service-orchestrator/src/test/java/LBTester.java new file mode 100644 index 0000000..9cd6da5 --- /dev/null +++ b/service-orchestrator/src/test/java/LBTester.java @@ -0,0 +1,70 @@ +import algorithms.ConsistentHashing; +import algorithms.WeightedRoundRobin; +import models.Node; +import models.Request; +import models.Service; +import org.junit.Assert; +import org.junit.Test; + +public class LBTester { + @Test + public void LBDefaultBehaviour() { + LoadBalancer loadBalancer = new LoadBalancer(); + final var consistentHashing = new ConsistentHashing(point -> (long) Math.abs(point.hashCode()) % 100, 1); + final String profileServiceId = "profile", smsServiceId = "sms", emailServiceId = "email"; + + loadBalancer.register(new Service(profileServiceId, consistentHashing, new String[]{"addProfile", "deleteProfile", "updateProfile"})); + loadBalancer.register(new Service(smsServiceId, new WeightedRoundRobin(), new String[]{"sendSms", "addTemplate", "getSMSForUser"})); + loadBalancer.register(new Service(emailServiceId, new WeightedRoundRobin(), new String[]{"sendEmail", "addTemplate", "getSMSForUser"})); + + final Node pNode1 = new Node("51", "35.45.55.65", 2), pNode2 = new Node("22", "35.45.55.66", 3); + loadBalancer.addNode(profileServiceId, pNode1); + loadBalancer.addNode(profileServiceId, pNode2); + + final Node sNode1 = new Node("13", "35.45.55.67"), sNode2 = new Node("64", "35.45.55.68"); + loadBalancer.addNode(smsServiceId, sNode1); + loadBalancer.addNode(smsServiceId, sNode2); + + final Node eNode1 = new Node("node-35", "35.45.55.69", 2), eNode2 = new Node("node-76", "35.45.55.70"); + loadBalancer.addNode(emailServiceId, eNode1); + loadBalancer.addNode(emailServiceId, eNode2); + + var profileNode1 = loadBalancer.getHandler(new Request("r-123", profileServiceId, "addProfile")); + var profileNode2 = loadBalancer.getHandler(new Request("r-244", profileServiceId, "addProfile")); + var profileNode3 = loadBalancer.getHandler(new Request("r-659", profileServiceId, "addProfile")); + var profileNode4 = loadBalancer.getHandler(new Request("r-73", profileServiceId, "addProfile")); + Assert.assertEquals(pNode1, profileNode1); + Assert.assertEquals(pNode1, profileNode2); + Assert.assertEquals(pNode2, profileNode3); + Assert.assertEquals(pNode1, profileNode4); + + loadBalancer.removeNode(profileServiceId, pNode1.getId()); + + profileNode1 = loadBalancer.getHandler(new Request("r-123", profileServiceId, "addProfile")); + profileNode2 = loadBalancer.getHandler(new Request("r-244", profileServiceId, "addProfile")); + profileNode3 = loadBalancer.getHandler(new Request("r-659", profileServiceId, "addProfile")); + profileNode4 = loadBalancer.getHandler(new Request("r-73", profileServiceId, "addProfile")); + Assert.assertEquals(pNode2, profileNode1); + Assert.assertEquals(pNode2, profileNode2); + Assert.assertEquals(pNode2, profileNode3); + Assert.assertEquals(pNode2, profileNode4); + + final var smsNode1 = loadBalancer.getHandler(new Request("r-124", smsServiceId, "addTemplate")); + final var smsNode2 = loadBalancer.getHandler(new Request("r-1214", smsServiceId, "addTemplate")); + final var smsNode3 = loadBalancer.getHandler(new Request("r-4", smsServiceId, "addTemplate")); + + Assert.assertEquals(sNode1, smsNode1); + Assert.assertEquals(sNode2, smsNode2); + Assert.assertEquals(sNode1, smsNode3); + + final var emailNode1 = loadBalancer.getHandler(new Request("r-1232", emailServiceId, "addTemplate")); + final var emailNode2 = loadBalancer.getHandler(new Request("r-4134", emailServiceId, "addTemplate")); + final var emailNode3 = loadBalancer.getHandler(new Request("r-23432", emailServiceId, "addTemplate")); + final var emailNode4 = loadBalancer.getHandler(new Request("r-5345", emailServiceId, "addTemplate")); + + Assert.assertEquals(eNode1, emailNode1); + Assert.assertEquals(eNode1, emailNode2); + Assert.assertEquals(eNode2, emailNode3); + Assert.assertEquals(eNode1, emailNode4); + } +} diff --git a/service-orchestrator/src/test/java/RouterTester.java b/service-orchestrator/src/test/java/RouterTester.java new file mode 100644 index 0000000..3c6e91e --- /dev/null +++ b/service-orchestrator/src/test/java/RouterTester.java @@ -0,0 +1,140 @@ +import algorithms.ConsistentHashing; +import algorithms.Router; +import algorithms.WeightedRoundRobin; +import models.Node; +import models.Request; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; + +public class RouterTester { + String ipAddress = "127.0.0.1", serviceId = "service", method = "method"; + + @Test + public void defaultRoundRobin() { + final Router router = new WeightedRoundRobin(); + final Node node1 = newNode("node-1"), node2 = newNode("node-2"), node3 = newNode("node-3"); + router.addNode(node1); + router.addNode(node2); + router.addNode(node3); + + Assert.assertEquals(node1, router.getAssignedNode(newRequest("r-123"))); + Assert.assertEquals(node2, router.getAssignedNode(newRequest("r-124"))); + Assert.assertEquals(node3, router.getAssignedNode(newRequest("r-125"))); + + router.removeNode(node1); + + Assert.assertEquals(node2, router.getAssignedNode(newRequest("r-125"))); + Assert.assertEquals(node3, router.getAssignedNode(newRequest("r-126"))); + Assert.assertEquals(node2, router.getAssignedNode(newRequest("r-127"))); + Assert.assertEquals(node3, router.getAssignedNode(newRequest("r-128"))); + + final Node node4 = new Node("node-4", ipAddress, 2); + router.addNode(node4); + + Assert.assertEquals(node4, router.getAssignedNode(newRequest("r-129"))); + Assert.assertEquals(node4, router.getAssignedNode(newRequest("r-130"))); + Assert.assertEquals(node2, router.getAssignedNode(newRequest("r-131"))); + } + + @Test + public void defaultConsistentHashing() { + final List hashes = new ArrayList<>(); + hashes.add(1L); + hashes.add(11L); + hashes.add(21L); + hashes.add(31L); + final Function hashFunction = id -> { + if (id.contains("000000")) { + return hashes.remove(0); + } else { + return Long.parseLong(id); + } + }; + final Router router = new ConsistentHashing(hashFunction, 1); + final Node node1 = newNode("1000000"), node2 = newNode("2000000"), node3 = newNode("3000000"); + router.addNode(node1); + router.addNode(node2); + router.addNode(node3); + + Assert.assertEquals(node1, router.getAssignedNode(newRequest("35"))); + Assert.assertEquals(node2, router.getAssignedNode(newRequest("5"))); + Assert.assertEquals(node3, router.getAssignedNode(newRequest("15"))); + + router.removeNode(node1); + + Assert.assertEquals(node2, router.getAssignedNode(newRequest("22"))); + Assert.assertEquals(node3, router.getAssignedNode(newRequest("12"))); + Assert.assertEquals(node2, router.getAssignedNode(newRequest("23"))); + Assert.assertEquals(node3, router.getAssignedNode(newRequest("13"))); + + final Node node4 = newNode("4000000"); + router.addNode(node4); + + Assert.assertEquals(node4, router.getAssignedNode(newRequest("25"))); + } + + @Test(expected = IllegalArgumentException.class) + public void consistentHashingConstruction() { + new ConsistentHashing(Long::valueOf, 0); + } + + @Test + public void consistentHashingWithWeights() { + final List hashes = new ArrayList<>(); + hashes.add(1L); // remaining is node 1 + hashes.add(21L); // 12 to 21 is for node 1 + hashes.add(11L); // 2 to 11 is for node 2 + hashes.add(41L); // 32 to 41 is for node 2 + hashes.add(31L); // 22 to 31 is for node 3 + hashes.add(51L); // 42 to 51 is for node 3 --> 10 points + final Function hashFunction = id -> { + //range should be (0, 60) + if (id.contains("000000")) { + return hashes.remove(0); + } else { + return Long.parseLong(id); + } + }; + final Router router = new ConsistentHashing(hashFunction, 2); + final Node node1 = newNode("1000000"), node2 = newNode("2000000"), node3 = newNode("3000000"); + router.addNode(node1); + router.addNode(node2); + router.addNode(node3); + + Assert.assertEquals(node1, router.getAssignedNode(newRequest("55"))); + Assert.assertEquals(node1, router.getAssignedNode(newRequest("15"))); + Assert.assertEquals(node2, router.getAssignedNode(newRequest("8"))); + Assert.assertEquals(node2, router.getAssignedNode(newRequest("33"))); + Assert.assertEquals(node3, router.getAssignedNode(newRequest("28"))); + Assert.assertEquals(node3, router.getAssignedNode(newRequest("47"))); + + router.removeNode(node1); + // remaining is node 2 + // 12 to 21 is now for node 3 + Assert.assertEquals(node2, router.getAssignedNode(newRequest("58"))); + Assert.assertEquals(node3, router.getAssignedNode(newRequest("12"))); + Assert.assertEquals(node3, router.getAssignedNode(newRequest("23"))); + Assert.assertEquals(node2, router.getAssignedNode(newRequest("54"))); + + final Node node4 = newNode("4000000"); + hashes.add(6L); // 0 to 6 is for node 4, 52 to remaining is for node 4 + hashes.add(26L); // 12 to 26 is for node 4 + router.addNode(node4); + + Assert.assertEquals(node4, router.getAssignedNode(newRequest("15"))); + Assert.assertEquals(node4, router.getAssignedNode(newRequest("59"))); + Assert.assertEquals(node4, router.getAssignedNode(newRequest("5"))); + } + + private Request newRequest(String s) { + return new Request(s, serviceId, method); + } + + private Node newNode(String s) { + return new Node(s, ipAddress); + } +} diff --git a/service-orchestrator/target/classes/LoadBalancer.class b/service-orchestrator/target/classes/LoadBalancer.class new file mode 100644 index 0000000..0cef8c9 Binary files /dev/null and b/service-orchestrator/target/classes/LoadBalancer.class differ diff --git a/service-orchestrator/target/classes/META-INF/service-orchestrator.kotlin_module b/service-orchestrator/target/classes/META-INF/service-orchestrator.kotlin_module new file mode 100644 index 0000000..a49347a Binary files /dev/null and b/service-orchestrator/target/classes/META-INF/service-orchestrator.kotlin_module differ diff --git a/service-orchestrator/target/classes/algorithms/ConsistentHashing.class b/service-orchestrator/target/classes/algorithms/ConsistentHashing.class new file mode 100644 index 0000000..789ee5b Binary files /dev/null and b/service-orchestrator/target/classes/algorithms/ConsistentHashing.class differ diff --git a/service-orchestrator/target/classes/algorithms/Router.class b/service-orchestrator/target/classes/algorithms/Router.class new file mode 100644 index 0000000..25a1997 Binary files /dev/null and b/service-orchestrator/target/classes/algorithms/Router.class differ diff --git a/service-orchestrator/target/classes/algorithms/WeightedRoundRobin.class b/service-orchestrator/target/classes/algorithms/WeightedRoundRobin.class new file mode 100644 index 0000000..f6c18e9 Binary files /dev/null and b/service-orchestrator/target/classes/algorithms/WeightedRoundRobin.class differ diff --git a/service-orchestrator/target/classes/models/Node.class b/service-orchestrator/target/classes/models/Node.class new file mode 100644 index 0000000..570d4c5 Binary files /dev/null and b/service-orchestrator/target/classes/models/Node.class differ diff --git a/service-orchestrator/target/classes/models/Request.class b/service-orchestrator/target/classes/models/Request.class new file mode 100644 index 0000000..4f77d9e Binary files /dev/null and b/service-orchestrator/target/classes/models/Request.class differ diff --git a/service-orchestrator/target/classes/models/Service.class b/service-orchestrator/target/classes/models/Service.class new file mode 100644 index 0000000..df903f7 Binary files /dev/null and b/service-orchestrator/target/classes/models/Service.class differ diff --git a/service-orchestrator/target/test-classes/LBTester.class b/service-orchestrator/target/test-classes/LBTester.class new file mode 100644 index 0000000..54f716f Binary files /dev/null and b/service-orchestrator/target/test-classes/LBTester.class differ diff --git a/service-orchestrator/target/test-classes/META-INF/service-orchestrator.kotlin_module b/service-orchestrator/target/test-classes/META-INF/service-orchestrator.kotlin_module new file mode 100644 index 0000000..a49347a Binary files /dev/null and b/service-orchestrator/target/test-classes/META-INF/service-orchestrator.kotlin_module differ diff --git a/service-orchestrator/target/test-classes/RouterTester.class b/service-orchestrator/target/test-classes/RouterTester.class new file mode 100644 index 0000000..23e9091 Binary files /dev/null and b/service-orchestrator/target/test-classes/RouterTester.class differ