Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection extraField Enrichment. #1929

Open
VETRIVEL001 opened this issue Apr 29, 2024 · 7 comments
Open

Connection extraField Enrichment. #1929

VETRIVEL001 opened this issue Apr 29, 2024 · 7 comments
Labels

Comments

@VETRIVEL001
Copy link

VETRIVEL001 commented Apr 29, 2024

I have given this RQL query in kafka connection topics field which is inside targets "_/_/things/twin/events?filter=and(like(thingId,"org.eclipse.ditto:myThing*"),exists(features/lamp))&extraFields=thingId,features/water-tank/properties,attributes/location".
Now i need to send events on kafka only if features/ lamp was modified but when i mention &extraFields= in that query, its sending events if anything gets changed in thing.
can anyone help me out in this, when particular field was modified in thing, send events with extraField.

@thjaeckle
Copy link
Member

@VETRIVEL001 as long as you don't specify features or features/lamp in the extraFields, the behavior should be as you expect it to be:

  • only if the lamp feature was modified, an event should be published

If this is not the case, please share a reproducer, e.g. as complete connection configuration and as the change you expect to "pass" the filter - and one change to a Thing you expect to "not pass" the filter.

@VETRIVEL001
Copy link
Author

VETRIVEL001 commented Apr 30, 2024

This is my thing
{
"thingId": "org.eclipse.ditto:myThing12345",
"policyId": "org.eclipse.ditto:myThing12345",
"attributes": {
"manufacturer": "ACME demo corp.",
"location": "Berlin, main floor",
"serialno": "42",
"model": "Speaking coffee machine"
},
"features": {
"water-tank": {
"properties": {
},
"desiredProperties": {}
},
"lamp": {
"properties": {},
"desiredProperties": {}
}
}
and this is my connection
{
"id": "werkjhgf",
"name": "KAFKA",
"connectionType": "kafka",
"connectionStatus": "open",
"uri": "tcp://ip:9092",
"sources": [
{
"addresses": [
"topic-123"
],
"consumerCount": 1,
"qos": 0,
"authorizationContext": [
"nginx:ditto"
],
"headerMapping": {},
"payloadMapping": [
"Ditto"
],
"replyTarget": {
"address": "topic-Reply-123",
"headerMapping": {},
"expectedResponseTypes": [
"response",
"error"
],
"enabled": true
}
}
],
"targets": [
{
"address": "eventTopic",
"topics": [
"//things/twin/events?filter=and(like(thingId,'org.eclipse.ditto:myThing:*'),exists(features/lamp))&extraFields=thingId,features/water-tank/properties,attributes/location"
],
"qos": 0,
"authorizationContext": [
"nginx:ditto"
],
"headerMapping": {}
}
],
"clientCount": 1,
"failoverEnabled": true,
"validateCertificates": false,
"processorPoolSize": 1,
"specificConfig": {
"saslMechanism": "plain",
"bootstrapServers": "kafka_ip:port"
},
"mappingDefinitions": {
"LexiMapper": {
"mappingEngine": "",
"options": {
"incomingScript": "",
"outgoingScript": ""
}
}
},
"tags": []
}
this is how I created the connection in my ditto. I am expecting it should publish the event with these extraFields "thingId,features/water-tank/properties,attributes/location" only if I changed anything under the lamp feature but it was publishing the event on the topic if anything gets changed in thing.
I want to publish event with extraFields only specified 'feature or field' gets changed. can anyone tell me how to do it?

@thjaeckle
Copy link
Member

@VETRIVEL001 I cannot reproduce ..

What I did:

  • Created the connection like you did with:
"topics": [
                "_/_/things/twin/events?filter=and(like(thingId,'org.eclipse.ditto:myThing:*'),exists(features/lamp))&extraFields=thingId,features/water-tank/properties,attributes/location"
            ],
  • then I created the Thing (notice that you require another : in your filter like(thingId,'org.eclipse.ditto:myThing:*'):
PUT /api/2/things/org.eclipse.ditto:myThing:123

with payload:

{
  "attributes": {
    "manufacturer": "ACME demo corp.",
    "location": "Berlin, main floor",
    "serialno": "42",
    "model": "Speaking coffee machine"
  },
  "features": {
    "water-tank": {
      "properties": {},
      "desiredProperties": {}
    },
    "lamp": {
      "properties": {},
      "desiredProperties": {}
    }
  }
}
  • That triggered the connection to publish an event (because features/lamp) was contained in the payload.
  • Then I updated only the feature water-tank with:
PUT /api/2/things/org.eclipse.ditto:myThing:123/features/water-tank

Any payload:

{
   "properties": {
      "foo": "bar"
   }
}
  • Which did - as expected - not send out an event via the connection - as the lamp feature was not updated.

@VETRIVEL001
Copy link
Author

I also followed the same way you did.
connection:
"topics": [
"//things/twin/events?filter=and(like(thingId,'org.eclipse.ditto:myThing:*'),exists(features/lamp))&extraFields=thingId,features/water-tank/properties,attributes/location"
],

PUT /api/2/things/org.eclipse.ditto:myThing:123/features/water-tank
I tried both put and patch
payload:
{
"properties": {
"foo": "bar"
}
}
but it sending out an event on kafka. still it persists the same issue.

@thjaeckle
Copy link
Member

Do you maybe still have another Kafka connection in Ditto which does not have the same filter?

@VETRIVEL001
Copy link
Author

No, I have only one connection.

@dimabarbul
Copy link
Contributor

Hello @VETRIVEL001

I tried to replicate the issue, but I wasn't able to. I use Ditto 3.5.4.

I basically did the same steps as you:

  • created thing with lamp feature provided
  • updated (with PUT) water-tank feature
  • updated (with PUT) lamp feature

Before doing that I enabled logs for the connection and here is what I see in Ditto UI:
2024-05-03-122852_912x454_scrot
Creating thing and updating lamp feature provided series of events with types dispatched-filtered-mapped-published while updating water-tank feature produced only event of dispatched type, which is expected behavior.

Connection logs as JSON
{
  "connectionId": "1929-kafka",
  "connectionLogs": [
    {
      "correlationId": "8dda0473-9ea8-43ff-bf28-d4a476ffae11",
      "timestamp": "2024-05-03T09:23:13.366877149Z",
      "category": "connection",
      "type": "other",
      "level": "success",
      "message": "Successfully reset the logs.",
      "entityType": "connection",
      "entityId": "1929-kafka"
    },
    {
      "correlationId": "24767649-4f65-4874-9f21-ad232a19361a",
      "timestamp": "2024-05-03T09:23:17.151495994Z",
      "category": "target",
      "type": "dispatched",
      "level": "success",
      "message": "Successfully dispatched signal. - Message headers: [referer=http://localhost:8080/ui/, sec-fetch-site=same-origin, correlation-id=24767649-4f65-4874-9f21-ad232a19361a, origin=http://localhost:8080, ditto-ackgregator-address=pekko://ditto-cluster@172.18.0.9:2551/user/$Yb/ackr0-24767649-4f65-4874-9f21-ad232a19361a#16254017, requested-acks=[\"twin-persisted\"], authorization=Basic ZGl0dG86ZGl0dG8=, ditto-originator=nginx:ditto, response-required=false, host=localhost:8080, ditto-read-subjects=[\"nginx:ditto\"], sec-fetch-mode=cors, if-none-match=*, x-ditto-pre-authenticated=nginx:ditto, cookie=.AspNetCore.Antiforgery.P_Dp5CLNiMg=CfDJ8EsrvQfSA0lDjqCxsYQfCHfsIwLKzeW0GSHtFQ-_km-beQOug9wZEm5ctF2ZoQNQ-FypJc2ZWPbI7HQQK_McYdFGOlry7K8LzOkGvQjSO971P9wxTb_pVvG0o0u1NR1i3ljZqJEpEA6SEpI278_Dhl0; .AspNetCore.Antiforgery.8vFUR3_kimI=CfDJ8EsrvQfSA0lDjqCxsYQfCHfO5jmjvCr-hw6-WX5H38gaDdbRs9B-QivzCQjvik0fzWhOh0PVL0SYDqrNr8wIxLJSismJPCv3iocH6K16UkDU6iGpsTHR18J8opgm3GD8-rCqZIslFupzOx1VZ0Mlxog, accept-language=en-US, en;q=0.5, dnt=1, x-forwarded-for=172.18.0.1, pragma=no-cache, accept=*/*, x-real-ip=172.18.0.1, x-forwarded-user=ditto, ditto-auth-context={\"type\":\"pre-authenticated-http\",\"subjects\":[\"nginx:ditto\"]}, sec-fetch-dest=empty, user-agent=Mozilla/5.0 (X11; Linux x86_64; rv:124.0) Gecko/20100101 Firefox/124.0] - Message payload: {\"type\":\"things.events:thingCreated\",\"_timestamp\":\"2024-05-03T09:23:17.128058725Z\",\"_metadata\":null,\"revision\":9,\"thingId\":\"org.eclipse.ditto:myThing:123\",\"thing\":{\"thingId\":\"org.eclipse.ditto:myThing:123\",\"policyId\":\"org.eclipse.ditto:myThing:123\",\"attributes\":{\"manufacturer\":\"ACME demo corp.\",\"location\":\"Berlin, main floor\",\"serialno\":\"42\",\"model\":\"Speaking coffee machine\"},\"features\":{\"water-tank\":{\"properties\":{},\"desiredProperties\":{}},\"lamp\":{\"properties\":{},\"desiredProperties\":{}}}}}",
      "address": "eventTopic",
      "entityType": "thing",
      "entityId": "org.eclipse.ditto:myThing:123"
    },
    {
      "correlationId": "24767649-4f65-4874-9f21-ad232a19361a",
      "timestamp": "2024-05-03T09:23:17.168057238Z",
      "category": "target",
      "type": "filtered",
      "level": "success",
      "message": "Signal successfully passed possible filters.",
      "address": "eventTopic",
      "entityType": "thing",
      "entityId": "org.eclipse.ditto:myThing:123"
    },
    {
      "correlationId": "24767649-4f65-4874-9f21-ad232a19361a",
      "timestamp": "2024-05-03T09:23:17.201507206Z",
      "category": "target",
      "type": "mapped",
      "level": "success",
      "message": "Mapped outgoing signal with mapper <default>",
      "address": "eventTopic",
      "entityType": "thing",
      "entityId": "org.eclipse.ditto:myThing:123"
    },
    {
      "correlationId": "24767649-4f65-4874-9f21-ad232a19361a",
      "timestamp": "2024-05-03T09:23:17.254721551Z",
      "category": "target",
      "type": "published",
      "level": "success",
      "message": "Successfully published signal. - Message headers: [content-type=application/vnd.eclipse.ditto+json, correlation-id=24767649-4f65-4874-9f21-ad232a19361a] - Message payload: {\"topic\":\"org.eclipse.ditto/myThing:123/things/twin/events/created\",\"headers\":{\"sec-fetch-mode\":\"cors\",\"referer\":\"http://localhost:8080/ui/\",\"x-ditto-pre-authenticated\":\"nginx:ditto\",\"sec-fetch-site\":\"same-origin\",\"cookie\":\".AspNetCore.Antiforgery.P_Dp5CLNiMg=CfDJ8EsrvQfSA0lDjqCxsYQfCHfsIwLKzeW0GSHtFQ-_km-beQOug9wZEm5ctF2ZoQNQ-FypJc2ZWPbI7HQQK_McYdFGOlry7K8LzOkGvQjSO971P9wxTb_pVvG0o0u1NR1i3ljZqJEpEA6SEpI278_Dhl0; .AspNetCore.Antiforgery.8vFUR3_kimI=CfDJ8EsrvQfSA0lDjqCxsYQfCHfO5jmjvCr-hw6-WX5H38gaDdbRs9B-QivzCQjvik0fzWhOh0PVL0SYDqrNr8wIxLJSismJPCv3iocH6K16UkDU6iGpsTHR18J8opgm3GD8-rCqZIslFupzOx1VZ0Mlxog\",\"accept-language\":\"en-US, en;q=0.5\",\"origin\":\"http://localhost:8080\",\"dnt\":\"1\",\"x-forwarded-for\":\"172.18.0.1\",\"pragma\":\"no-cache\",\"accept\":\"*/*\",\"authorization\":\"Basic ZGl0dG86ZGl0dG8=\",\"x-real-ip\":\"172.18.0.1\",\"x-forwarded-user\":\"ditto\",\"host\":\"localhost:8080\",\"sec-fetch-dest\":\"empty\",\"user-agent\":\"Mozilla/5.0 (X11; Linux x86_64; rv:124.0) Gecko/20100101 Firefox/124.0\",\"ditto-originator\":\"nginx:ditto\",\"response-required\":false,\"requested-acks\":[],\"content-type\":\"application/json\",\"correlation-id\":\"24767649-4f65-4874-9f21-ad232a19361a\"},\"path\":\"/\",\"value\":{\"thingId\":\"org.eclipse.ditto:myThing:123\",\"policyId\":\"org.eclipse.ditto:myThing:123\",\"attributes\":{\"manufacturer\":\"ACME demo corp.\",\"location\":\"Berlin, main floor\",\"serialno\":\"42\",\"model\":\"Speaking coffee machine\"},\"features\":{\"water-tank\":{\"properties\":{},\"desiredProperties\":{}},\"lamp\":{\"properties\":{},\"desiredProperties\":{}}}},\"extra\":{\"thingId\":\"org.eclipse.ditto:myThing:123\",\"features\":{\"water-tank\":{\"properties\":{}}},\"attributes\":{\"location\":\"Berlin, main floor\"}},\"revision\":9,\"timestamp\":\"2024-05-03T09:23:17.128058725Z\"}",
      "address": "eventTopic"
    },
    {
      "correlationId": "fabbefdc-2d9a-4ffe-b772-87df269cdf90",
      "timestamp": "2024-05-03T09:24:16.594616743Z",
      "category": "target",
      "type": "dispatched",
      "level": "success",
      "message": "Successfully dispatched signal. - Message headers: [x-ditto-pre-authenticated=nginx:ditto, correlation-id=fabbefdc-2d9a-4ffe-b772-87df269cdf90, x-forwarded-for=172.18.0.1, ditto-ackgregator-address=pekko://ditto-cluster@172.18.0.9:2551/user/$3b/ackr0-fabbefdc-2d9a-4ffe-b772-87df269cdf90#-907028526, accept=*/*, requested-acks=[\"twin-persisted\"], authorization=Basic ZGl0dG86ZGl0dG8=, x-real-ip=172.18.0.1, x-forwarded-user=ditto, ditto-originator=nginx:ditto, response-required=false, ditto-auth-context={\"type\":\"pre-authenticated-http\",\"subjects\":[\"nginx:ditto\"]}, host=localhost:8080, ditto-read-subjects=[\"nginx:ditto\"], user-agent=curl/8.7.1] - Message payload: {\"type\":\"things.events:featureModified\",\"_timestamp\":\"2024-05-03T09:24:16.573186811Z\",\"_metadata\":null,\"revision\":10,\"thingId\":\"org.eclipse.ditto:myThing:123\",\"featureId\":\"water-tank\",\"feature\":{\"properties\":{\"foo\":\"bar\"}}}",
      "address": "eventTopic",
      "entityType": "thing",
      "entityId": "org.eclipse.ditto:myThing:123"
    },
    {
      "correlationId": "a2306b00-9c37-4334-a285-bdd520017c7d",
      "timestamp": "2024-05-03T09:24:20.508087532Z",
      "category": "target",
      "type": "dispatched",
      "level": "success",
      "message": "Successfully dispatched signal. - Message headers: [x-ditto-pre-authenticated=nginx:ditto, correlation-id=a2306b00-9c37-4334-a285-bdd520017c7d, x-forwarded-for=172.18.0.1, ditto-ackgregator-address=pekko://ditto-cluster@172.18.0.9:2551/user/$4b/ackr0-a2306b00-9c37-4334-a285-bdd520017c7d#-1885240064, accept=*/*, requested-acks=[\"twin-persisted\"], authorization=Basic ZGl0dG86ZGl0dG8=, x-real-ip=172.18.0.1, x-forwarded-user=ditto, ditto-originator=nginx:ditto, response-required=false, ditto-auth-context={\"type\":\"pre-authenticated-http\",\"subjects\":[\"nginx:ditto\"]}, host=localhost:8080, ditto-read-subjects=[\"nginx:ditto\"], user-agent=curl/8.7.1] - Message payload: {\"type\":\"things.events:featureModified\",\"_timestamp\":\"2024-05-03T09:24:20.493875969Z\",\"_metadata\":null,\"revision\":11,\"thingId\":\"org.eclipse.ditto:myThing:123\",\"featureId\":\"lamp\",\"feature\":{\"properties\":{\"foo\":\"bar\"}}}",
      "address": "eventTopic",
      "entityType": "thing",
      "entityId": "org.eclipse.ditto:myThing:123"
    },
    {
      "correlationId": "a2306b00-9c37-4334-a285-bdd520017c7d",
      "timestamp": "2024-05-03T09:24:20.516700844Z",
      "category": "target",
      "type": "filtered",
      "level": "success",
      "message": "Signal successfully passed possible filters.",
      "address": "eventTopic",
      "entityType": "thing",
      "entityId": "org.eclipse.ditto:myThing:123"
    },
    {
      "correlationId": "a2306b00-9c37-4334-a285-bdd520017c7d",
      "timestamp": "2024-05-03T09:24:20.586274286Z",
      "category": "target",
      "type": "mapped",
      "level": "success",
      "message": "Mapped outgoing signal with mapper <default>",
      "address": "eventTopic",
      "entityType": "thing",
      "entityId": "org.eclipse.ditto:myThing:123"
    },
    {
      "correlationId": "a2306b00-9c37-4334-a285-bdd520017c7d",
      "timestamp": "2024-05-03T09:24:20.606415085Z",
      "category": "target",
      "type": "published",
      "level": "success",
      "message": "Successfully published signal. - Message headers: [content-type=application/vnd.eclipse.ditto+json, correlation-id=a2306b00-9c37-4334-a285-bdd520017c7d] - Message payload: {\"topic\":\"org.eclipse.ditto/myThing:123/things/twin/events/modified\",\"headers\":{\"authorization\":\"Basic ZGl0dG86ZGl0dG8=\",\"x-real-ip\":\"172.18.0.1\",\"x-forwarded-user\":\"ditto\",\"x-ditto-pre-authenticated\":\"nginx:ditto\",\"host\":\"localhost:8080\",\"x-forwarded-for\":\"172.18.0.1\",\"accept\":\"*/*\",\"user-agent\":\"curl/8.7.1\",\"ditto-originator\":\"nginx:ditto\",\"response-required\":false,\"requested-acks\":[],\"content-type\":\"application/json\",\"correlation-id\":\"a2306b00-9c37-4334-a285-bdd520017c7d\"},\"path\":\"/features/lamp\",\"value\":{\"properties\":{\"foo\":\"bar\"}},\"extra\":{\"thingId\":\"org.eclipse.ditto:myThing:123\",\"features\":{\"water-tank\":{\"properties\":{\"foo\":\"bar\"}}},\"attributes\":{\"location\":\"Berlin, main floor\"}},\"revision\":11,\"timestamp\":\"2024-05-03T09:24:20.493875969Z\"}",
      "address": "eventTopic"
    }
  ],
  "enabledSince": "2024-05-03T09:33:01.831675300Z",
  "enabledUntil": "2024-05-03T10:33:01.833136688Z"
}

Here are the events I see in kafka (I'm using command kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic eventTopic in kafka container):

{"topic":"org.eclipse.ditto/myThing:123/things/twin/events/created","headers":{"sec-fetch-mode":"cors","referer":"http://localhost:8080/ui/","x-ditto-pre-authenticated":"nginx:ditto","sec-fetch-site":"same-origin","cookie":".AspNetCore.Antiforgery.P_Dp5CLNiMg=CfDJ8EsrvQfSA0lDjqCxsYQfCHfsIwLKzeW0GSHtFQ-_km-beQOug9wZEm5ctF2ZoQNQ-FypJc2ZWPbI7HQQK_McYdFGOlry7K8LzOkGvQjSO971P9wxTb_pVvG0o0u1NR1i3ljZqJEpEA6SEpI278_Dhl0; .AspNetCore.Antiforgery.8vFUR3_kimI=CfDJ8EsrvQfSA0lDjqCxsYQfCHfO5jmjvCr-hw6-WX5H38gaDdbRs9B-QivzCQjvik0fzWhOh0PVL0SYDqrNr8wIxLJSismJPCv3iocH6K16UkDU6iGpsTHR18J8opgm3GD8-rCqZIslFupzOx1VZ0Mlxog","accept-language":"en-US, en;q=0.5","origin":"http://localhost:8080","dnt":"1","x-forwarded-for":"172.18.0.1","pragma":"no-cache","accept":"*/*","authorization":"Basic ZGl0dG86ZGl0dG8=","x-real-ip":"172.18.0.1","x-forwarded-user":"ditto","host":"localhost:8080","sec-fetch-dest":"empty","user-agent":"Mozilla/5.0 (X11; Linux x86_64; rv:124.0) Gecko/20100101 Firefox/124.0","ditto-originator":"nginx:ditto","response-required":false,"requested-acks":[],"content-type":"application/json","correlation-id":"24767649-4f65-4874-9f21-ad232a19361a"},"path":"/","value":{"thingId":"org.eclipse.ditto:myThing:123","policyId":"org.eclipse.ditto:myThing:123","attributes":{"manufacturer":"ACME demo corp.","location":"Berlin, main floor","serialno":"42","model":"Speaking coffee machine"},"features":{"water-tank":{"properties":{},"desiredProperties":{}},"lamp":{"properties":{},"desiredProperties":{}}}},"extra":{"thingId":"org.eclipse.ditto:myThing:123","features":{"water-tank":{"properties":{}}},"attributes":{"location":"Berlin, main floor"}},"revision":9,"timestamp":"2024-05-03T09:23:17.128058725Z"}
{"topic":"org.eclipse.ditto/myThing:123/things/twin/events/modified","headers":{"authorization":"Basic ZGl0dG86ZGl0dG8=","x-real-ip":"172.18.0.1","x-forwarded-user":"ditto","x-ditto-pre-authenticated":"nginx:ditto","host":"localhost:8080","x-forwarded-for":"172.18.0.1","accept":"*/*","user-agent":"curl/8.7.1","ditto-originator":"nginx:ditto","response-required":false,"requested-acks":[],"content-type":"application/json","correlation-id":"a2306b00-9c37-4334-a285-bdd520017c7d"},"path":"/features/lamp","value":{"properties":{"foo":"bar"}},"extra":{"thingId":"org.eclipse.ditto:myThing:123","features":{"water-tank":{"properties":{"foo":"bar"}}},"attributes":{"location":"Berlin, main floor"}},"revision":11,"timestamp":"2024-05-03T09:24:20.493875969Z"}

Could you, please, provide following:

  • Ditto version you use
  • connection logs
    Please, enable connection logs, do the test and send the logs here. Logs can be enabled using HTTP API or Ditto UI under "Connections" tab at the top.
    Logs can be viewed on Ditto UI as well, or requested using HTTP API. I believe, requesting them using HTTP API will be better here as UI does not show whole logs at once and JSON can be more easily checked.
  • events published to Kafka

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants