Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
d9ccf64
Introduce Delta Protocol core classes, including commands (`AddProper…
ftomassetti Oct 19, 2025
38bea10
Introduce new DeltaCommand classes for operations: Add/Delete/Move/Re…
ftomassetti Oct 19, 2025
85c8624
Introduce new DeltaCommand classes for moving and replacing entries a…
ftomassetti Oct 19, 2025
1ddb5b0
Introduce new DeltaCommand classes for composite, add, delete, change…
ftomassetti Oct 19, 2025
8c60ac7
Introduce new DeltaEvent classes for references, annotations, childre…
ftomassetti Oct 20, 2025
2f486da
Refactor DeltaQuery classes: reorganize packages, add `ListPartitions…
ftomassetti Oct 20, 2025
106eff2
Introduce DeltaClient with synchronization support and event handling…
ftomassetti Oct 21, 2025
6eae591
Introduce `DeltaServer` to handle server-side processing of `DeltaEve…
ftomassetti Oct 24, 2025
78c983b
Introduce support for `DeltaCommand` handling in `InMemoryServer` wit…
ftomassetti Oct 24, 2025
3184657
Refactor `InMemoryServer` and `DeltaChannel` to enhance readability a…
ftomassetti Oct 24, 2025
dc6eceb
Introduce error handling for invalid node operations and enhance `Del…
ftomassetti Oct 24, 2025
6890e01
Remove `DeltaInMemoryServer` and redundant event handling code from `…
ftomassetti Oct 24, 2025
2324b09
Rename DeltaQuery classes to follow consistent `Request` suffix namin…
ftomassetti Oct 25, 2025
2c644e6
Refactor `CommonDeltaEvent` to `BaseDeltaEvent`, updating all depende…
ftomassetti Oct 25, 2025
7fc2d66
Replace `DeltaCommandResponse` architecture with `ErrorEvent` handlin…
ftomassetti Oct 25, 2025
14782e9
Add support for `AddChild` DeltaCommand and `ChildAdded` DeltaEvent a…
ftomassetti Oct 26, 2025
2f324a3
Add participation management with `SignOnRequest` and `SignOnResponse…
ftomassetti Oct 27, 2025
25c75a2
Add support for `DeleteChild` DeltaCommand and `ChildDeleted` DeltaEv…
ftomassetti Nov 3, 2025
786f686
Add constructor overloads with `id` parameter to language model entit…
ftomassetti Nov 3, 2025
8dfc3b9
Add indexed reference value support to model layers, implement `AddRe…
ftomassetti Nov 3, 2025
7c86af4
Restore and update property initialization in `DeltaClientAndServerTe…
ftomassetti Nov 3, 2025
a18f122
Add constructor overloads with `id` parameter to `Containment` and `L…
ftomassetti Nov 3, 2025
55ade3e
Add constructor overloads with `id` parameter to `Reference`, refacto…
ftomassetti Nov 3, 2025
df29be6
Refactor tests to streamline `Property` and `Feature` initialization,…
ftomassetti Nov 3, 2025
424cf69
Add `setUnavailableReferenceTargetPolicy` in `DeltaClient` initializa…
ftomassetti Nov 3, 2025
07f21d8
Remove unused constructor overloads in core language model classes (`…
ftomassetti Nov 8, 2025
a9cd8f0
Refactor `Enumeration` and `EnumerationLiteral` initialization to use…
ftomassetti Nov 8, 2025
8ee2339
Remove unused default method `addChild` and redundant `addReferenceVa…
ftomassetti Nov 8, 2025
a3fbf4f
Formatting
ftomassetti Nov 9, 2025
fbddd8e
Restore ProxyNode from main
ftomassetti Nov 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions client/src/main/java/io/lionweb/client/delta/DeltaChannel.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.lionweb.client.delta;

import io.lionweb.client.delta.messages.*;
import java.util.function.Function;

/**
* The DeltaChannel must be a specific link between a Client and the Server. Different clients
* should use different DeltaChannels because the clientId must be determined from the channel.
*/
public interface DeltaChannel {
/**
* Queries initiated/requested by the client, with synchronous response by the repository. A query
* requests some information from the repository without changing the repository’s contents. The
* repository gathers all information needed to answer the query, and sends the information back.
* The repository might reply invalid queries with a failure message. We also use queries for
* managing participations.
*/
DeltaQueryResponse sendQuery(Function<String, DeltaQuery> queryProducer);

/**
* Commands initiated/requested by the client, with synchronous response by the repository. A
* command requests some change to the repository. The repository quickly confirms having received
* the command, or rejects a failed command.[5] However, the repository processes the command
* asynchronously, and eventually broadcasts the effect(s) as event.
*/
void sendCommand(String participationId, Function<String, DeltaCommand> commandProducer);

void sendEvent(Function<Integer, DeltaEvent> eventProducer);

void registerEventReceiver(DeltaEventReceiver deltaEventReceiver);

void unregisterEventReceiver(DeltaEventReceiver deltaEventReceiver);

void registerCommandReceiver(DeltaCommandReceiver deltaCommandReceiver);

void unregisterCommandReceiver(DeltaCommandReceiver deltaCommandReceiver);

void registerQueryReceiver(DeltaQueryReceiver deltaQueryReceiver);

void unregisterQueryReceiver(DeltaQueryReceiver deltaQueryReceiver);

void registerQueryResponseReceiver(DeltaQueryResponseReceiver deltaQueryResponseReceiver);

void unregisterQueryResponseReceiver(DeltaQueryResponseReceiver deltaQueryResponseReceiver);
}
321 changes: 321 additions & 0 deletions client/src/main/java/io/lionweb/client/delta/DeltaClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,321 @@
package io.lionweb.client.delta;

import io.lionweb.LionWebVersion;
import io.lionweb.client.delta.messages.BaseDeltaEvent;
import io.lionweb.client.delta.messages.DeltaEvent;
import io.lionweb.client.delta.messages.DeltaQueryResponse;
import io.lionweb.client.delta.messages.commands.children.AddChild;
import io.lionweb.client.delta.messages.commands.children.DeleteChild;
import io.lionweb.client.delta.messages.commands.properties.ChangeProperty;
import io.lionweb.client.delta.messages.commands.references.AddReference;
import io.lionweb.client.delta.messages.events.ErrorEvent;
import io.lionweb.client.delta.messages.events.children.ChildAdded;
import io.lionweb.client.delta.messages.events.children.ChildDeleted;
import io.lionweb.client.delta.messages.events.properties.PropertyChanged;
import io.lionweb.client.delta.messages.events.references.ReferenceAdded;
import io.lionweb.client.delta.messages.queries.partitcipations.SignOnRequest;
import io.lionweb.client.delta.messages.queries.partitcipations.SignOnResponse;
import io.lionweb.language.Containment;
import io.lionweb.language.Property;
import io.lionweb.language.Reference;
import io.lionweb.model.*;
import io.lionweb.model.impl.ProxyNode;
import io.lionweb.serialization.*;
import io.lionweb.serialization.data.MetaPointer;
import io.lionweb.serialization.data.SerializationChunk;
import java.lang.ref.WeakReference;
import java.util.*;
import org.jetbrains.annotations.NotNull;

public class DeltaClient implements DeltaEventReceiver, DeltaQueryResponseReceiver {
private LionWebVersion lionWebVersion;
private DeltaChannel channel;
private MonitoringObserver observer = new MonitoringObserver();
private String participationId;
private HashMap<String, Set<WeakReference<ClassifierInstance<?>>>> nodes = new HashMap<>();
private PrimitiveValuesSerialization primitiveValuesSerialization =
new PrimitiveValuesSerialization();
private AbstractSerialization serialization;
private Set<String> queriesSent = new HashSet<>();
private String clientId;

public DeltaClient(DeltaChannel channel, String clientId) {
this(LionWebVersion.currentVersion, channel, clientId);
}

public DeltaClient(LionWebVersion lionWebVersion, DeltaChannel channel, String clientId) {
this.clientId = clientId;
this.lionWebVersion = lionWebVersion;
this.channel = channel;
this.channel.registerEventReceiver(this);
this.channel.registerQueryResponseReceiver(this);
this.primitiveValuesSerialization.registerLionBuiltinsPrimitiveSerializersAndDeserializers(
lionWebVersion);
this.serialization = SerializationProvider.getStandardJsonSerialization(lionWebVersion);
this.serialization.setUnavailableParentPolicy(UnavailableNodePolicy.PROXY_NODES);
this.serialization.setUnavailableReferenceTargetPolicy(UnavailableNodePolicy.PROXY_NODES);
}

/**
* It is responsibility of the caller to ensure that the partition is initially in sync with the
* server.
*/
public void monitor(@NotNull Node partition) {
Objects.requireNonNull(partition, "partition should not be null");
synchronized (partition) {
partition
.thisAndAllDescendants()
.forEach(
n ->
nodes
.computeIfAbsent(n.getID(), id -> new HashSet<>())
.add(new WeakReference<>(n)));
partition.registerPartitionObserver(observer);
}
}

protected void monitorNode(Node node) {
nodes.computeIfAbsent(node.getID(), id -> new HashSet<>()).add(new WeakReference<>(node));
}

@Override
public void receiveEvent(DeltaEvent event) {
if (event instanceof BaseDeltaEvent) {
BaseDeltaEvent<?> baseDeltaEvent = (BaseDeltaEvent<?>) event;

if (baseDeltaEvent.originCommands.stream()
.anyMatch(
(CommandSource originCommand) ->
originCommand.participationId.equals(this.participationId))) {
return;
}
}
observer.paused = true;
if (event instanceof PropertyChanged) {
PropertyChanged propertyChanged = (PropertyChanged) event;
Set<WeakReference<ClassifierInstance<?>>> matchingNodes =
this.nodes.get(propertyChanged.node);
if (matchingNodes != null) {
for (WeakReference<ClassifierInstance<?>> classifierInstanceRef : matchingNodes) {
ClassifierInstance<?> classifierInstance = classifierInstanceRef.get();
if (classifierInstance != null) {
ClassifierInstanceUtils.setPropertyValueByMetaPointer(
classifierInstance, propertyChanged.property, propertyChanged.newValue);
}
}
}
} else if (event instanceof ChildAdded) {
ChildAdded childAdded = (ChildAdded) event;
for (WeakReference<ClassifierInstance<?>> classifierInstanceRef :
nodes.get(childAdded.parent)) {
ClassifierInstance<?> classifierInstance = classifierInstanceRef.get();
if (classifierInstance != null) {
Node child =
(Node) serialization.deserializeSerializationChunk(childAdded.newChild).get(0);
monitorNode(child);
Containment containment =
classifierInstance
.getClassifier()
.getContainmentByMetaPointer(childAdded.containment);
if (containment == null) {
throw new IllegalStateException(
"Containment not found for "
+ classifierInstance
+ " using metapointer "
+ childAdded.containment);
}
classifierInstance.addChild(containment, child, childAdded.index);
}
}
} else if (event instanceof ChildDeleted) {
ChildDeleted childDeleted = (ChildDeleted) event;
for (WeakReference<ClassifierInstance<?>> classifierInstanceRef :
nodes.get(childDeleted.parent)) {
ClassifierInstance<?> classifierInstance = classifierInstanceRef.get();
if (classifierInstance != null) {
Containment containment =
classifierInstance
.getClassifier()
.getContainmentByMetaPointer(childDeleted.containment);
if (containment == null) {
throw new IllegalStateException(
"Containment not found for "
+ classifierInstance
+ " using metapointer "
+ childDeleted.containment);
}
classifierInstance.removeChild(containment, childDeleted.index);
}
}
} else if (event instanceof ErrorEvent) {
ErrorEvent errorEvent = (ErrorEvent) event;
observer.paused = false;
throw new ErrorEventReceivedException(errorEvent.errorCode, errorEvent.message);
} else if (event instanceof ReferenceAdded) {
ReferenceAdded referenceAdded = (ReferenceAdded) event;
for (WeakReference<ClassifierInstance<?>> classifierInstanceRef :
nodes.get(referenceAdded.parent)) {
ClassifierInstance<?> classifierInstance = classifierInstanceRef.get();
if (classifierInstance != null) {
Reference reference =
classifierInstance
.getClassifier()
.getReferenceByMetaPointer(referenceAdded.reference);
if (reference == null) {
throw new IllegalStateException(
"Reference not found for "
+ classifierInstance
+ " using metapointer "
+ referenceAdded.reference);
}
classifierInstance.addReferenceValue(
reference,
referenceAdded.index,
new ReferenceValue(
new ProxyNode(referenceAdded.newTarget), referenceAdded.newResolveInfo));
}
}
} else {
observer.paused = false;
throw new UnsupportedOperationException(
"Unsupported event type: " + event.getClass().getName());
}
observer.paused = false;
}

private class MonitoringObserver implements PartitionObserver {

public boolean paused = false;

@Override
public void propertyChanged(
ClassifierInstance<?> classifierInstance,
Property property,
Object oldValue,
Object newValue) {
if (paused) return;
channel.sendCommand(
participationId,
commandId ->
new ChangeProperty(
commandId,
classifierInstance.getID(),
MetaPointer.from(property),
primitiveValuesSerialization.serialize(property.getType().getID(), newValue)));
}

@Override
public void childAdded(
ClassifierInstance<?> classifierInstance,
Containment containment,
int index,
Node newChild) {
if (paused) return;
SerializationChunk chunk = serialization.serializeNodesToSerializationChunk(newChild);
if (newChild.getID() == null) {
throw new IllegalStateException("Child id must not be null");
}
channel.sendCommand(
participationId,
commandId ->
new AddChild(
commandId,
classifierInstance.getID(),
chunk,
MetaPointer.from(containment),
index));
}

@Override
public void childRemoved(
ClassifierInstance<?> classifierInstance,
Containment containment,
int index,
@NotNull Node removedChild) {
if (paused) return;
Objects.requireNonNull(removedChild, "removedChild must not be null");
String removedChildId = removedChild.getID();
Objects.requireNonNull(removedChildId, "removedChildId must not be null");
channel.sendCommand(
participationId,
commandId ->
new DeleteChild(
commandId,
classifierInstance.getID(),
MetaPointer.from(containment),
index,
removedChildId));
}

@Override
public void annotationAdded(
ClassifierInstance<?> node, int index, AnnotationInstance newAnnotation) {
throw new UnsupportedOperationException("Not supported yet.");
}

@Override
public void annotationRemoved(
ClassifierInstance<?> node, int index, AnnotationInstance removedAnnotation) {
throw new UnsupportedOperationException("Not supported yet.");
}

@Override
public void referenceValueAdded(
ClassifierInstance<?> classifierInstance,
Reference reference,
int index,
ReferenceValue referenceValue) {
if (paused) return;
channel.sendCommand(
participationId,
commandId ->
new AddReference(
commandId,
classifierInstance.getID(),
MetaPointer.from(reference),
index,
referenceValue.getReferredID(),
referenceValue.getResolveInfo()));
}

@Override
public void referenceValueChanged(
ClassifierInstance<?> classifierInstance,
Reference reference,
int index,
String oldReferred,
String oldResolveInfo,
String newReferred,
String newResolveInfo) {
throw new UnsupportedOperationException("Not supported yet.");
}

@Override
public void referenceValueRemoved(
ClassifierInstance<?> classifierInstance,
Reference reference,
int index,
ReferenceValue referenceValue) {
throw new UnsupportedOperationException("Not supported yet.");
}
}

@Override
public void receiveQueryResponse(DeltaQueryResponse queryResponse) {
if (!queriesSent.contains(queryResponse.queryId)) return;
if (queryResponse instanceof SignOnResponse) {
SignOnResponse signOnResponse = (SignOnResponse) queryResponse;
this.participationId = signOnResponse.participationId;
return;
}
throw new UnsupportedOperationException("Not supported yet.");
}

public void sendSignOnRequest() {
channel.sendQuery(
queryId -> {
queriesSent.add(queryId);
return new SignOnRequest(queryId, DeltaProtocolVersion.v2025_1, clientId);
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.lionweb.client.delta;

import io.lionweb.client.delta.messages.DeltaCommand;

public interface DeltaCommandReceiver {

void receiveCommand(String participationId, DeltaCommand command);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.lionweb.client.delta;

import io.lionweb.client.delta.messages.DeltaEvent;

public interface DeltaEventReceiver {

void receiveEvent(DeltaEvent event);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.lionweb.client.delta;

import io.lionweb.client.delta.messages.DeltaQuery;
import io.lionweb.client.delta.messages.DeltaQueryResponse;

public interface DeltaQueryReceiver {

DeltaQueryResponse receiveQuery(DeltaQuery query);
}
Loading