Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 49 additions & 58 deletions node-red-node-wot/src/wot-event.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ module.exports = function (RED) {
let node = this
let consumedThing
let subscription
let repeatId

this.status({})

Expand All @@ -20,81 +19,73 @@ module.exports = function (RED) {

const thingNode = RED.nodes.getNode(config.thing)
thingNode.addUpdateTDListener(async (_consumedThing) => {
if (repeatId) {
clearInterval(repeatId)
repeatId = undefined
}
if (subscription) {
// Stop if already subscribed
await subscription.stop()
}
subscription = undefined
consumedThing = _consumedThing
// Repeat until event subscription succeeds.
await new Promise((resolve) => {
repeatId = setInterval(() => {
consumedThing
.subscribeEvent(
config.event,
async (resp) => {
if (resp) {
let payload
try {
payload = await resp.value()
} catch (err) {
node.error(`[error] failed to get event. err: ${err.toString()}`)
console.error(`[error] failed to get event. err: `, err)
while (true) {
try {
await new Promise((resolve, reject) => {
consumedThing
.subscribeEvent(
config.event,
async (resp) => {
if (resp) {
let payload
try {
payload = await resp.value()
} catch (err) {
node.error(`[error] failed to get event. err: ${err.toString()}`)
console.error(`[error] failed to get event. err: `, err)
}
node.send({ payload, topic: config.topic })
}
node.send({ payload, topic: config.topic })
node.status({
fill: "green",
shape: "dot",
text: "Subscribed",
})
},
(err) => {
console.error("[error] subscribe events.", err)
node.error(`[error] subscribe events. err: ${err.toString()}`)
node.status({
fill: "red",
shape: "ring",
text: "Subscription error",
})
reject(err)
}
)
.then((sub) => {
subscription = sub
node.status({
fill: "green",
shape: "dot",
text: "Subscribed",
})
},
(err) => {
console.error("[error] subscribe events.", err)
node.error(`[error] subscribe events. err: ${err.toString()}`)
node.status({
fill: "red",
shape: "ring",
text: "Subscription error",
})
},
() => {
console.error("[warn] Subscription ended.")
node.warn("[warn] Subscription ended.")
node.status({})
subscription = undefined
}
)
.then((sub) => {
subscription = sub
clearInterval(repeatId)
repeatId = undefined
resolve()
})
.catch((err) => {
console.warn("[warn] event subscribe error. try again. error: " + err)
})
}, 1000)
})

if (subscription) {
node.status({
fill: "green",
shape: "dot",
text: "Subscribed",
})
})
.catch(reject)
});
} catch (err) {
console.warn("[warn] event subscribe error. try again. error: " + err)
node.status({
fill: "red",
shape: "ring",
text: "Subscription error",
})
}
if (subscription) {
await subscription.stop()
}
await new Promise((resolve) => setTimeout(resolve, 1000))
}
})

this.on("close", async function (removed, done) {
if (repeatId) {
clearInterval(repeatId)
repeatId = undefined
}
if (subscription) {
// Stop if already subscribed
await subscription.stop()
Expand Down
104 changes: 52 additions & 52 deletions node-red-node-wot/src/wot-property.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ module.exports = function (RED) {
let node = this
let consumedThing
let subscription
let repeatId

this.status({})

Expand All @@ -24,10 +23,6 @@ module.exports = function (RED) {

const thingNode = RED.nodes.getNode(config.thing)
thingNode.addUpdateTDListener(async (_consumedThing) => {
if (repeatId) {
clearInterval(repeatId)
repeatId = undefined
}
if (subscription) {
// Stop if already subscribed
await subscription.stop()
Expand All @@ -38,53 +33,62 @@ module.exports = function (RED) {
return
}
// Repeat until observeProperty succeeds.
await new Promise((resolve) => {
repeatId = setInterval(() => {
consumedThing
.observeProperty(
config.property,
async (resp) => {
let payload
try {
payload = await resp.value()
} catch (err) {
node.error(`[error] failed to get property change. err: ${err.toString()}`)
console.error(`[error] failed to get property change. err:`, err)
while (true) {
try {
await new Promise((resolve, reject) => {
consumedThing
.observeProperty(
config.property,
async (resp) => {
if (resp) {
let payload
try {
payload = await resp.value()
} catch (err) {
node.error(`[error] failed to get property change. err: ${err.toString()}`)
console.error(`[error] failed to get property change. err:`, err)
}
node.send({ payload, topic: config.topic })
}
node.status({
fill: "green",
shape: "dot",
text: "Subscribed",
})
},
(err) => {
node.error(`[error] property observe error. error: ${err.toString()}`)
console.error(`[error] property observe error. error: `, err)
node.status({
fill: "red",
shape: "ring",
text: "Observe error",
})
reject(err)
}
node.send({ payload, topic: config.topic })
},
(err) => {
node.error(`[error] property observe error. error: ${err.toString()}`)
console.error(`[error] property observe error. error: `, err)
)
.then((sub) => {
subscription = sub
node.status({
fill: "red",
shape: "ring",
text: "Observe error",
fill: "green",
shape: "dot",
text: "connected",
})
}
)
.then((sub) => {
subscription = sub
clearInterval(repeatId)
repeatId = undefined
resolve()
})
.catch((err) => {
console.warn("[warn] property observe error. try again. error: " + err)
node.status({
fill: "red",
shape: "ring",
text: "Observe error",
})
})
}, 1000)
})
if (subscription) {
node.status({
fill: "green",
shape: "dot",
text: "connected",
})
.catch(reject)
});
} catch (err) {
console.warn("[warn] property observe error. try again. error: " + err)
node.status({
fill: "red",
shape: "ring",
text: "Observe error",
})
}
if (subscription) {
await subscription.stop()
}
await new Promise((resolve) => setTimeout(resolve, 1000))
}
})

Expand Down Expand Up @@ -126,10 +130,6 @@ module.exports = function (RED) {
})

node.on("close", async function (removed, done) {
if (repeatId) {
clearInterval(repeatId)
repeatId = undefined
}
if (subscription) {
// Stop if already subscribed
await subscription.stop()
Expand Down
Loading