Added LLD projects

This commit is contained in:
Your Name
2020-07-15 18:02:38 +05:30
commit 0d2fabc962
135 changed files with 3319 additions and 0 deletions

View File

@@ -0,0 +1,203 @@
import com.google.inject.Inject;
import com.google.inject.Singleton;
import exceptions.RetryLimitExceededException;
import exceptions.UnsubscribedPollException;
import lib.KeyedExecutor;
import models.Event;
import models.FailureEvent;
import models.Subscription;
import util.Timer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.function.Function;
import java.util.function.Predicate;
@Singleton
public class EventBus {
private final Map<String, List<Event>> topics;
private final Map<String, Map<String, Integer>> eventIndexes;
private final Map<String, ConcurrentSkipListMap<Long, String>> eventTimestamps;
private final Map<String, Map<String, Subscription>> pullSubscriptions;
private final Map<String, Map<String, Subscription>> pushSubscriptions;
private final KeyedExecutor<String> eventExecutor;
private final KeyedExecutor<String> broadcastExecutor;
private EventBus deadLetterQueue;
private final Timer timer;
@Inject
public EventBus(final KeyedExecutor<String> eventExecutor, final KeyedExecutor<String> broadcastExecutor, final Timer timer) {
this.topics = new ConcurrentHashMap<>();
this.eventIndexes = new ConcurrentHashMap<>();
this.eventTimestamps = new ConcurrentHashMap<>();
this.pullSubscriptions = new ConcurrentHashMap<>();
this.pushSubscriptions = new ConcurrentHashMap<>();
this.eventExecutor = eventExecutor;
this.broadcastExecutor = broadcastExecutor;
this.timer = timer;
}
public void setDeadLetterQueue(final EventBus deadLetterQueue) {
this.deadLetterQueue = deadLetterQueue;
}
public CompletionStage<Void> publish(final String topic, final Event event) {
return eventExecutor.getThreadFor(topic, publishToBus(topic, event));
}
private CompletionStage<Void> publishToBus(final String topic, final Event event) {
if (eventIndexes.containsKey(topic) && eventIndexes.get(topic).containsKey(event.getId())) {
return null;
}
topics.putIfAbsent(topic, new CopyOnWriteArrayList<>());
eventIndexes.putIfAbsent(topic, new ConcurrentHashMap<>());
eventIndexes.get(topic).put(event.getId(), topics.get(topic).size());
eventTimestamps.putIfAbsent(topic, new ConcurrentSkipListMap<>());
eventTimestamps.get(topic).put(timer.getCurrentTime(), event.getId());
topics.get(topic).add(event);
return notifyPushSubscribers(topic, event);
}
private CompletionStage<Void> notifyPushSubscribers(String topic, Event event) {
if (!pushSubscriptions.containsKey(topic)) {
return CompletableFuture.completedStage(null);
}
final var subscribersForTopic = pushSubscriptions.get(topic);
final var notifications = subscribersForTopic.values()
.stream()
.filter(subscription -> subscription.getPrecondition().test(event))
.map(subscription -> executeEventHandler(event, subscription))
.toArray(CompletableFuture[]::new);
return CompletableFuture.allOf(notifications);
}
private CompletionStage<Void> executeEventHandler(final Event event, Subscription subscription) {
return broadcastExecutor.getThreadFor(subscription.getTopic() + subscription.getSubscriber(),
doWithRetry(event, subscription.getEventHandler(),
1, subscription.getNumberOfRetries())
.exceptionally(throwable -> {
if (deadLetterQueue != null) {
deadLetterQueue.publish(subscription.getTopic(), new FailureEvent(event, throwable, timer.getCurrentTime()));
}
return null;
}));
}
private CompletionStage<Void> doWithRetry(final Event event,
final Function<Event, CompletionStage<Void>> task,
final int coolDownIntervalInMillis,
final int remainingTries) {
return task.apply(event).handle((__, throwable) -> {
if (throwable != null) {
if (remainingTries == 1) {
throw new RetryLimitExceededException(throwable);
}
try {
Thread.sleep(coolDownIntervalInMillis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return doWithRetry(event, task, Math.max(coolDownIntervalInMillis * 2, 10), remainingTries - 1);
} else {
return CompletableFuture.completedFuture((Void) null);
}
}).thenCompose(Function.identity());
}
public CompletionStage<Event> poll(final String topic, final String subscriber) {
return eventExecutor.getThreadFor(topic + subscriber, () -> pollBus(topic, subscriber));
}
private Event pollBus(final String topic, final String subscriber) {
var subscription = pullSubscriptions.getOrDefault(topic, new HashMap<>()).get(subscriber);
if (subscription == null) {
throw new UnsubscribedPollException();
}
for (var index = subscription.getCurrentIndex(); index.intValue() < topics.get(topic).size(); index.increment()) {
var event = topics.get(topic).get(index.intValue());
if (subscription.getPrecondition().test(event)) {
index.increment();
return event;
}
}
return null;
}
public CompletionStage<Void> subscribeToEventsAfter(final String topic, final String subscriber, final long timeStamp) {
return eventExecutor.getThreadFor(topic + subscriber, () -> moveIndexAtTimestamp(topic, subscriber, timeStamp));
}
private void moveIndexAtTimestamp(final String topic, final String subscriber, final long timeStamp) {
final var closestEventAfter = eventTimestamps.get(topic).higherEntry(timeStamp);
if (closestEventAfter == null) {
pullSubscriptions.get(topic).get(subscriber).setCurrentIndex(eventIndexes.get(topic).size());
} else {
final var eventIndex = eventIndexes.get(topic).get(closestEventAfter.getValue());
pullSubscriptions.get(topic).get(subscriber).setCurrentIndex(eventIndex);
}
}
public CompletionStage<Void> subscribeToEventsAfter(final String topic, final String subscriber, final String eventId) {
return eventExecutor.getThreadFor(topic + subscriber, () -> moveIndexAfterEvent(topic, subscriber, eventId));
}
private void moveIndexAfterEvent(final String topic, final String subscriber, final String eventId) {
if (eventId == null) {
pullSubscriptions.get(topic).get(subscriber).setCurrentIndex(0);
} else {
final var eventIndex = eventIndexes.get(topic).get(eventId) + 1;
pullSubscriptions.get(topic).get(subscriber).setCurrentIndex(eventIndex);
}
}
public CompletionStage<Void> subscribeForPush(final String topic,
final String subscriber,
final Predicate<Event> precondition,
final Function<Event, CompletionStage<Void>> handler,
final int numberOfRetries) {
return eventExecutor.getThreadFor(topic + subscriber,
() -> subscribeForPushEvents(topic, subscriber, precondition, handler, numberOfRetries));
}
private void subscribeForPushEvents(final String topic,
final String subscriber,
final Predicate<Event> precondition,
final Function<Event, CompletionStage<Void>> handler,
final int numberOfRetries) {
addSubscriber(pushSubscriptions, subscriber, precondition, topic, handler, numberOfRetries);
}
private void addSubscriber(final Map<String, Map<String, Subscription>> pullSubscriptions,
final String subscriber,
final Predicate<Event> precondition,
final String topic,
final Function<Event, CompletionStage<Void>> handler,
final int numberOfRetries) {
pullSubscriptions.putIfAbsent(topic, new ConcurrentHashMap<>());
final var subscription = new Subscription(topic, subscriber, precondition, handler, numberOfRetries);
subscription.setCurrentIndex(topics.getOrDefault(topic, new ArrayList<>()).size());
pullSubscriptions.get(topic).put(subscriber, subscription);
}
public CompletionStage<Void> subscribeForPull(final String topic, final String subscriber, final Predicate<Event> precondition) {
return eventExecutor.getThreadFor(topic + subscriber, () -> subscribeForPullEvents(topic, subscriber, precondition));
}
private void subscribeForPullEvents(final String topic, final String subscriber, final Predicate<Event> precondition) {
addSubscriber(pullSubscriptions, subscriber, precondition, topic, null, 0);
}
public CompletionStage<Void> unsubscribe(final String topic, final String subscriber) {
return eventExecutor.getThreadFor(topic + subscriber, () -> unsubscribeFromTopic(topic, subscriber));
}
private void unsubscribeFromTopic(final String topic, final String subscriber) {
pushSubscriptions.getOrDefault(topic, new HashMap<>()).remove(subscriber);
pullSubscriptions.getOrDefault(topic, new HashMap<>()).remove(subscriber);
}
}

View File

@@ -0,0 +1,7 @@
package exceptions;
public class RetryLimitExceededException extends RuntimeException {
public RetryLimitExceededException(Throwable cause) {
super(cause);
}
}

View File

@@ -0,0 +1,4 @@
package exceptions;
public class UnsubscribedPollException extends RuntimeException {
}

View File

@@ -0,0 +1,31 @@
package lib;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.function.Supplier;
public class KeyedExecutor<KEY> {
private final Executor[] executorPool;
public KeyedExecutor(final int poolSize) {
this.executorPool = new Executor[poolSize];
for (int i = 0; i < poolSize; i++) {
executorPool[i] = Executors.newSingleThreadExecutor();
}
}
public CompletionStage<Void> getThreadFor(KEY key, Runnable task) {
return CompletableFuture.runAsync(task, executorPool[Math.abs(key.hashCode() % executorPool.length)]);
}
public <U> CompletionStage<U> getThreadFor(KEY key, Supplier<U> task) {
return CompletableFuture.supplyAsync(task, executorPool[Math.abs(key.hashCode() % executorPool.length)]);
}
public <U> CompletionStage<U> getThreadFor(KEY key, CompletionStage<U> task) {
return CompletableFuture.supplyAsync(() -> task, executorPool[Math.abs(key.hashCode() % executorPool.length)]).thenCompose(Function.identity());
}
}

View File

@@ -0,0 +1,43 @@
package models;
import java.util.Objects;
import java.util.UUID;
public class Event {
private final String id;
private final String publisher;
private final EventType eventType;
private final String description;
private final long creationTime;
public Event(final String publisher,
final EventType eventType,
final String description,
final long creationTime) {
this.description = description;
this.id = UUID.randomUUID().toString();
this.publisher = publisher;
this.eventType = eventType;
this.creationTime = creationTime;
}
public String getId() {
return id;
}
public String getPublisher() {
return publisher;
}
public EventType getEventType() {
return eventType;
}
public long getCreationTime() {
return creationTime;
}
public String getDescription() {
return description;
}
}

View File

@@ -0,0 +1,5 @@
package models;
public enum EventType {
PRIORITY, LOGGING, ERROR
}

View File

@@ -0,0 +1,20 @@
package models;
public class FailureEvent extends Event {
private final Event event;
private final Throwable throwable;
public FailureEvent(Event event, Throwable throwable, long failureTimestamp) {
super("dead-letter-queue", EventType.ERROR, throwable.getMessage(), failureTimestamp);
this.event = event;
this.throwable = throwable;
}
public Event getEvent() {
return event;
}
public Throwable getThrowable() {
return throwable;
}
}

View File

@@ -0,0 +1,57 @@
package models;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
import java.util.function.Predicate;
public class Subscription {
private final String topic;
private final String subscriber;
private final Predicate<Event> precondition;
private final Function<Event, CompletionStage<Void>> eventHandler;
private final int numberOfRetries;
private final LongAdder currentIndex;
public Subscription(final String topic,
final String subscriber,
final Predicate<Event> precondition,
final Function<Event, CompletionStage<Void>> eventHandler,
final int numberOfRetries) {
this.topic = topic;
this.subscriber = subscriber;
this.precondition = precondition;
this.eventHandler = eventHandler;
this.currentIndex = new LongAdder();
this.numberOfRetries = numberOfRetries;
}
public String getTopic() {
return topic;
}
public String getSubscriber() {
return subscriber;
}
public Predicate<Event> getPrecondition() {
return precondition;
}
public Function<Event, CompletionStage<Void>> getEventHandler() {
return eventHandler;
}
public LongAdder getCurrentIndex() {
return currentIndex;
}
public void setCurrentIndex(final int offset) {
currentIndex.reset();
currentIndex.add(offset);
}
public int getNumberOfRetries() {
return numberOfRetries;
}
}

View File

@@ -0,0 +1,10 @@
package util;
import com.google.inject.Singleton;
@Singleton
public class Timer {
public long getCurrentTime() {
return System.nanoTime();
}
}

View File

@@ -0,0 +1,340 @@
import com.google.gson.Gson;
import exceptions.RetryLimitExceededException;
import exceptions.UnsubscribedPollException;
import lib.KeyedExecutor;
import models.Event;
import models.EventType;
import models.FailureEvent;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import util.Timer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
// Causal ordering of topics
public class EventBusTest {
public static final String TOPIC_1 = "topic-1";
public static final String TOPIC_2 = "topic-2";
public static final String PUBLISHER_1 = "publisher-1";
public static final String SUBSCRIBER_1 = "subscriber-1";
public static final String SUBSCRIBER_2 = "subscriber-2";
private Timer timer;
private KeyedExecutor<String> keyedExecutor;
private KeyedExecutor<String> broadcastExecutor;
@Before
public void setUp() {
keyedExecutor = new KeyedExecutor<>(16);
broadcastExecutor = new KeyedExecutor<>(16);
timer = new Timer();
}
private Event constructEvent(EventType priority, String description) {
return new Event(PUBLISHER_1, priority, description, timer.getCurrentTime());
}
@Test
public void defaultBehavior() {
final EventBus eventBus = new EventBus(keyedExecutor, broadcastExecutor, timer);
eventBus.publish(TOPIC_1, constructEvent(EventType.LOGGING, "first event"));
eventBus.subscribeForPull(TOPIC_1, SUBSCRIBER_1, (event) -> true).toCompletableFuture().join();
Assert.assertNull(eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join());
eventBus.publish(TOPIC_1, constructEvent(EventType.PRIORITY, "second event"));
final Event secondEvent = eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join();
Assert.assertEquals(EventType.PRIORITY, secondEvent.getEventType());
Assert.assertEquals("second event", secondEvent.getDescription());
eventBus.subscribeToEventsAfter(TOPIC_1, SUBSCRIBER_1, null).toCompletableFuture().join();
final Event firstEvent = eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join();
Assert.assertEquals(EventType.LOGGING, firstEvent.getEventType());
Assert.assertEquals("first event", firstEvent.getDescription());
Assert.assertEquals(PUBLISHER_1, firstEvent.getPublisher());
final List<Event> eventCollector = new ArrayList<>();
eventBus.subscribeForPush(TOPIC_1,
SUBSCRIBER_2,
(event) -> true,
(event) -> CompletableFuture.runAsync(() -> eventCollector.add(event)),
0).toCompletableFuture().join();
eventBus.publish(TOPIC_1, constructEvent(EventType.ERROR, "third event")).toCompletableFuture().join();
Assert.assertEquals(EventType.ERROR, eventCollector.get(0).getEventType());
Assert.assertEquals("third event", eventCollector.get(0).getDescription());
eventBus.unsubscribe(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join();
eventBus.publish(TOPIC_1, constructEvent(EventType.LOGGING, "fourth event")).toCompletableFuture().join();
Assert.assertTrue(eventBus.poll(TOPIC_1, SUBSCRIBER_1)
.handle((__, throwable) -> throwable.getCause() instanceof UnsubscribedPollException)
.toCompletableFuture().join());
eventCollector.clear();
eventBus.unsubscribe(TOPIC_1, SUBSCRIBER_2).toCompletableFuture().join();
eventBus.publish(TOPIC_1, constructEvent(EventType.LOGGING, "fifth event")).toCompletableFuture().join();
Assert.assertTrue(eventCollector.isEmpty());
}
@Test
public void indexMove() {
final EventBus eventBus = new EventBus(keyedExecutor, broadcastExecutor, timer);
eventBus.subscribeForPull(TOPIC_1, SUBSCRIBER_1, (event) -> true).toCompletableFuture().join();
final Event firstEvent = constructEvent(EventType.PRIORITY, "first event");
final Event secondEvent = constructEvent(EventType.PRIORITY, "second event");
final Event thirdEvent = constructEvent(EventType.PRIORITY, "third event");
eventBus.publish(TOPIC_1, firstEvent).toCompletableFuture().join();
eventBus.publish(TOPIC_1, secondEvent).toCompletableFuture().join();
eventBus.publish(TOPIC_1, thirdEvent).toCompletableFuture().join();
eventBus.subscribeToEventsAfter(TOPIC_1, SUBSCRIBER_1, secondEvent.getId()).toCompletableFuture().join();
final Event firstPoll = eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join();
Assert.assertEquals("third event", firstPoll.getDescription());
Assert.assertNull(eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join());
eventBus.subscribeToEventsAfter(TOPIC_1, SUBSCRIBER_1, null).toCompletableFuture().join();
final Event secondPoll = eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join();
Assert.assertEquals("first event", secondPoll.getDescription());
eventBus.subscribeToEventsAfter(TOPIC_1, SUBSCRIBER_1, firstEvent.getId()).toCompletableFuture().join();
final Event thirdPoll = eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join();
Assert.assertEquals("second event", thirdPoll.getDescription());
eventBus.subscribeToEventsAfter(TOPIC_1, SUBSCRIBER_1, thirdEvent.getId()).toCompletableFuture().join();
Assert.assertNull(eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join());
}
@Test
public void timestampMove() {
final TestTimer timer = new TestTimer();
final EventBus eventBus = new EventBus(keyedExecutor, broadcastExecutor, timer);
eventBus.subscribeForPull(TOPIC_1, SUBSCRIBER_1, (event) -> true).toCompletableFuture().join();
final Event firstEvent = new Event(PUBLISHER_1, EventType.PRIORITY, "first event", timer.getCurrentTime());
eventBus.publish(TOPIC_1, firstEvent).toCompletableFuture().join();
timer.setCurrentTime(timer.getCurrentTime() + Duration.ofSeconds(10).toNanos());
final Event secondEvent = new Event(PUBLISHER_1, EventType.PRIORITY, "second event", timer.getCurrentTime());
eventBus.publish(TOPIC_1, secondEvent).toCompletableFuture().join();
timer.setCurrentTime(timer.getCurrentTime() + Duration.ofSeconds(10).toNanos());
final Event thirdEvent = new Event(PUBLISHER_1, EventType.PRIORITY, "third event", timer.getCurrentTime());
eventBus.publish(TOPIC_1, thirdEvent).toCompletableFuture().join();
eventBus.subscribeToEventsAfter(TOPIC_1, SUBSCRIBER_1, secondEvent.getCreationTime() + Duration.ofSeconds(5).toNanos()).toCompletableFuture().join();
final Event firstPoll = eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join();
Assert.assertEquals("third event", firstPoll.getDescription());
Assert.assertNull(eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join());
eventBus.subscribeToEventsAfter(TOPIC_1, SUBSCRIBER_1, 0).toCompletableFuture().join();
final Event secondPoll = eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join();
Assert.assertEquals("first event", secondPoll.getDescription());
eventBus.subscribeToEventsAfter(TOPIC_1, SUBSCRIBER_1, firstEvent.getCreationTime() + Duration.ofSeconds(5).toNanos()).toCompletableFuture().join();
final Event thirdPoll = eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join();
Assert.assertEquals("second event", thirdPoll.getDescription());
eventBus.subscribeToEventsAfter(TOPIC_1, SUBSCRIBER_1, thirdEvent.getCreationTime() + Duration.ofNanos(1).toNanos()).toCompletableFuture().join();
Assert.assertNull(eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join());
}
@Test
public void idempotency() {
final EventBus eventBus = new EventBus(keyedExecutor, broadcastExecutor, timer);
eventBus.subscribeForPull(TOPIC_1, SUBSCRIBER_1, (event) -> true).toCompletableFuture().join();
Event event1 = new Gson().fromJson("{\n" +
" \"id\": \"event-5435\",\n" +
" \"publisher\": \"random-publisher-1\",\n" +
" \"eventType\": \"LOGGING\",\n" +
" \"description\": \"random-event-1\",\n" +
" \"creationTime\": 31884739810179363\n" +
"}", Event.class);
eventBus.publish(TOPIC_1, event1);
Event event2 = new Gson().fromJson("{\n" +
" \"id\": \"event-5435\",\n" +
" \"publisher\": \"random-publisher-2\",\n" +
" \"eventType\": \"PRIORITY\",\n" +
" \"description\": \"random-event-2\",\n" +
" \"creationTime\": 31824735510179363\n" +
"}", Event.class);
eventBus.publish(TOPIC_1, event2);
final Event firstEvent = eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join();
Assert.assertEquals(EventType.LOGGING, firstEvent.getEventType());
Assert.assertEquals("random-event-1", firstEvent.getDescription());
Assert.assertNull(eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join());
}
@Test
public void unsubscribePushEvents() {
final EventBus eventBus = new EventBus(keyedExecutor, broadcastExecutor, timer);
final List<Event> topic1 = new ArrayList<>(), topic2 = new ArrayList<>();
eventBus.subscribeForPush(TOPIC_1, SUBSCRIBER_1, event -> true, event -> {
topic1.add(event);
return CompletableFuture.completedStage(null);
}, 0).toCompletableFuture().join();
eventBus.subscribeForPush(TOPIC_2, SUBSCRIBER_1, event -> true, event -> {
topic2.add(event);
return CompletableFuture.completedStage(null);
}, 0).toCompletableFuture().join();
for (int i = 0; i < 3; i++) {
eventBus.publish(TOPIC_1, constructEvent(EventType.PRIORITY, UUID.randomUUID().toString())).toCompletableFuture().join();
}
eventBus.publish(TOPIC_2, constructEvent(EventType.PRIORITY, UUID.randomUUID().toString())).toCompletableFuture().join();
eventBus.unsubscribe(TOPIC_1, SUBSCRIBER_1);
Assert.assertEquals(3, topic1.size());
Assert.assertEquals(1, topic2.size());
for (int i = 0; i < 2; i++) {
eventBus.publish(TOPIC_2, constructEvent(EventType.PRIORITY, UUID.randomUUID().toString())).toCompletableFuture().join();
}
for (int i = 0; i < 3; i++) {
eventBus.publish(TOPIC_1, constructEvent(EventType.PRIORITY, UUID.randomUUID().toString())).toCompletableFuture().join();
}
Assert.assertEquals(3, topic1.size());
Assert.assertEquals(3, topic2.size());
eventBus.subscribeForPush(TOPIC_1, SUBSCRIBER_1, event -> true, event -> {
topic1.add(event);
return CompletableFuture.completedStage(null);
}, 0).toCompletableFuture().join();
for (int i = 0; i < 3; i++) {
eventBus.publish(TOPIC_1, constructEvent(EventType.PRIORITY, UUID.randomUUID().toString())).toCompletableFuture().join();
}
Assert.assertEquals(6, topic1.size());
Assert.assertEquals(3, topic2.size());
}
@Test
public void unsubscribePullEvents() {
final EventBus eventBus = new EventBus(keyedExecutor, broadcastExecutor, timer);
eventBus.subscribeForPull(TOPIC_1, SUBSCRIBER_1, event -> true).toCompletableFuture().join();
eventBus.subscribeForPull(TOPIC_2, SUBSCRIBER_1, event -> true).toCompletableFuture().join();
for (int i = 0; i < 3; i++) {
eventBus.publish(TOPIC_1, constructEvent(EventType.PRIORITY, UUID.randomUUID().toString())).toCompletableFuture().join();
}
eventBus.publish(TOPIC_2, constructEvent(EventType.PRIORITY, UUID.randomUUID().toString())).toCompletableFuture().join();
for (int i = 0; i < 3; i++) {
Assert.assertNotNull(eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join());
}
Assert.assertNotNull(eventBus.poll(TOPIC_2, SUBSCRIBER_1).toCompletableFuture().join());
eventBus.unsubscribe(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join();
for (int i = 0; i < 2; i++) {
eventBus.publish(TOPIC_2, constructEvent(EventType.PRIORITY, UUID.randomUUID().toString())).toCompletableFuture().join();
}
for (int i = 0; i < 3; i++) {
eventBus.publish(TOPIC_1, constructEvent(EventType.PRIORITY, UUID.randomUUID().toString())).toCompletableFuture().join();
}
Assert.assertTrue(eventBus.poll(TOPIC_1, SUBSCRIBER_1)
.handle((__, throwable) -> throwable.getCause() instanceof UnsubscribedPollException).toCompletableFuture().join());
for (int i = 0; i < 2; i++) {
Assert.assertNotNull(eventBus.poll(TOPIC_2, SUBSCRIBER_1).toCompletableFuture().join());
}
eventBus.subscribeForPull(TOPIC_1, SUBSCRIBER_1, event -> true).toCompletableFuture().join();
for (int i = 0; i < 3; i++) {
eventBus.publish(TOPIC_1, constructEvent(EventType.PRIORITY, UUID.randomUUID().toString())).toCompletableFuture().join();
}
for (int i = 0; i < 3; i++) {
Assert.assertNotNull(eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join());
}
Assert.assertNull(eventBus.poll(TOPIC_2, SUBSCRIBER_1).toCompletableFuture().join());
}
@Test
public void deadLetterQueue() {
final EventBus eventBus = new EventBus(keyedExecutor, broadcastExecutor, timer);
final EventBus dlq = new EventBus(new KeyedExecutor<>(3), new KeyedExecutor<>(3), new Timer());
eventBus.setDeadLetterQueue(dlq);
dlq.subscribeForPull(TOPIC_1, SUBSCRIBER_1, event -> event.getEventType().equals(EventType.ERROR));
final AtomicLong attempts = new AtomicLong();
final int maxTries = 5;
eventBus.subscribeForPush(TOPIC_1, SUBSCRIBER_1, event -> true, event -> {
attempts.incrementAndGet();
return CompletableFuture.failedStage(new RuntimeException());
}, maxTries).toCompletableFuture().join();
final Event event = new Event(PUBLISHER_1, EventType.LOGGING, "random", timer.getCurrentTime());
eventBus.publish(TOPIC_1, event).toCompletableFuture().join();
Assert.assertEquals(5, attempts.intValue());
final Event failureEvent = dlq.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join();
Assert.assertTrue(failureEvent instanceof FailureEvent);
Assert.assertEquals(event.getId(), ((FailureEvent) failureEvent).getEvent().getId());
Assert.assertEquals(EventType.ERROR, failureEvent.getEventType());
Assert.assertTrue(((FailureEvent) failureEvent).getThrowable().getCause() instanceof RetryLimitExceededException);
}
@Test
public void retrySuccess() {
final EventBus eventBus = new EventBus(keyedExecutor, broadcastExecutor, timer);
final AtomicLong attempts = new AtomicLong();
final int maxTries = 5;
final List<Event> events = new ArrayList<>();
eventBus.subscribeForPush(TOPIC_1, SUBSCRIBER_1, event -> true, event -> {
if (attempts.incrementAndGet() == maxTries) {
events.add(event);
return CompletableFuture.completedStage(null);
} else {
return CompletableFuture.failedStage(new RuntimeException("TRY no: " + attempts.intValue()));
}
}, maxTries).toCompletableFuture().join();
eventBus.publish(TOPIC_1, new Event(PUBLISHER_1, EventType.LOGGING, "random", timer.getCurrentTime())).toCompletableFuture().join();
Assert.assertEquals(EventType.LOGGING, events.get(0).getEventType());
Assert.assertEquals("random", events.get(0).getDescription());
Assert.assertEquals(5, attempts.intValue());
Assert.assertEquals(1, events.size());
}
@Test
public void preconditionCheckForPush() {
final EventBus eventBus = new EventBus(keyedExecutor, broadcastExecutor, timer);
final List<Event> events = new ArrayList<>();
eventBus.subscribeForPush(TOPIC_1, SUBSCRIBER_1, event -> event.getDescription().contains("-1"), event -> {
events.add(event);
return CompletableFuture.completedStage(null);
}, 0).toCompletableFuture().join();
eventBus.publish(TOPIC_1, new Event(PUBLISHER_1, EventType.LOGGING, "random-event-1", timer.getCurrentTime())).toCompletableFuture().join();
eventBus.publish(TOPIC_1, new Event(PUBLISHER_1, EventType.LOGGING, "random-event-2", timer.getCurrentTime())).toCompletableFuture().join();
eventBus.publish(TOPIC_1, new Event(PUBLISHER_1, EventType.LOGGING, "random-event-12", timer.getCurrentTime())).toCompletableFuture().join();
eventBus.publish(TOPIC_1, new Event(PUBLISHER_1, EventType.LOGGING, "random-event-21", timer.getCurrentTime())).toCompletableFuture().join();
Assert.assertEquals(events.size(), 2);
Assert.assertEquals(EventType.LOGGING, events.get(0).getEventType());
Assert.assertEquals("random-event-1", events.get(0).getDescription());
Assert.assertEquals(EventType.LOGGING, events.get(1).getEventType());
Assert.assertEquals("random-event-12", events.get(1).getDescription());
}
@Test
public void preconditionCheckForPull() {
final EventBus eventBus = new EventBus(keyedExecutor, broadcastExecutor, timer);
eventBus.subscribeForPull(TOPIC_1, SUBSCRIBER_1, event -> event.getDescription().contains("-1")).toCompletableFuture().join();
eventBus.publish(TOPIC_1, new Event(PUBLISHER_1, EventType.LOGGING, "random-event-1", timer.getCurrentTime())).toCompletableFuture().join();
eventBus.publish(TOPIC_1, new Event(PUBLISHER_1, EventType.LOGGING, "random-event-2", timer.getCurrentTime())).toCompletableFuture().join();
eventBus.publish(TOPIC_1, new Event(PUBLISHER_1, EventType.LOGGING, "random-event-12", timer.getCurrentTime())).toCompletableFuture().join();
eventBus.publish(TOPIC_1, new Event(PUBLISHER_1, EventType.LOGGING, "random-event-21", timer.getCurrentTime())).toCompletableFuture().join();
final Event event1 = eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join();
Assert.assertEquals(EventType.LOGGING, event1.getEventType());
Assert.assertEquals("random-event-1", event1.getDescription());
final Event event2 = eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join();
Assert.assertEquals(EventType.LOGGING, event2.getEventType());
Assert.assertEquals("random-event-12", event2.getDescription());
Assert.assertNull(eventBus.poll(TOPIC_1, SUBSCRIBER_1).toCompletableFuture().join());
}
}

View File

@@ -0,0 +1,18 @@
import util.Timer;
public class TestTimer extends Timer {
private long currentTime;
public TestTimer() {
this.currentTime = System.nanoTime();
}
@Override
public long getCurrentTime() {
return currentTime;
}
public void setCurrentTime(final long currentTime) {
this.currentTime = currentTime;
}
}