csep-2025/client/src/main/java/client/utils/WebSocketDataService.java

58 lines
2 KiB
Java

package client.utils;
import client.exception.InvalidModificationException;
import commons.ws.messages.Message;
import org.apache.commons.lang3.function.FailableFunction;
import org.apache.commons.lang3.tuple.Pair;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
public class WebSocketDataService<ID, Value> {
public WebSocketDataService() {
}
/**
* dataRegister maps a CompletableFuture&lt;T&gt; to its fulfilment function.
*/
private final Map<ID, CompletableFuture<Value>> pendingRegister = new HashMap<>();
private Function<Message, FailableFunction<Message, Pair<ID, Value>, InvalidModificationException>> parser;
public void setMessageParser(
Function<
Message,
FailableFunction<Message, Pair<ID, Value>, InvalidModificationException>
> parser) {
this.parser = parser;
}
/**
* On each WS message, the parser callback distinguishes what
* consumer (crud function) to call. Then the pair of ID-Value
* returned by the processor is used
* @param message
*/
public void onMessage(Message message) {
Pair<ID, Value> result = parser.apply(message).apply(message);
pendingRegister.get(result.getKey()).complete(result.getValue());
}
/**
* Adds an ID-reference to the map for an object whose update is pending.
* If the key already exists, the caller should assume there is an ongoing
* operation on the object.
* @param id ID of the object.
* @return Whether the key already exists.
*/
public boolean add(ID id, Consumer<Value> onComplete) {
CompletableFuture<Value> future = new CompletableFuture<>();
future.thenAccept(onComplete.andThen(_ -> pendingRegister.remove(id)));
return pendingRegister.putIfAbsent(id, future) == null;
}
public boolean add(ID id) {
return add(id, (_) -> {});
}
}