groups finished, error check needed
This commit is contained in:
76
agenda.js
76
agenda.js
@@ -168,6 +168,82 @@ const initializeAgenda = async (mongoUri, pool, io) => { // Now accepts pgPool
|
||||
}
|
||||
});
|
||||
|
||||
agenda.define('groupPosChangeScheduled', async (job) => {
|
||||
const { groupID, newPos, userID } = job.attrs.data;
|
||||
const dateTime = new Date();
|
||||
try {
|
||||
// Query current group members at execution time
|
||||
const {rows: peripheralRows} = await sharedPgPool.query(
|
||||
`SELECT p.id, p.peripheral_number, p.device_id
|
||||
FROM peripherals p
|
||||
JOIN group_peripherals gp ON p.id = gp.peripheral_id
|
||||
JOIN groups g ON gp.group_id = g.id
|
||||
WHERE g.id = $1 AND g.user_id = $2`,
|
||||
[groupID, userID]
|
||||
);
|
||||
|
||||
if (peripheralRows.length === 0) {
|
||||
console.log(`No peripherals found in group ${groupID}`);
|
||||
return;
|
||||
}
|
||||
|
||||
// Group peripherals by device_id
|
||||
const deviceMap = new Map();
|
||||
for (const periph of peripheralRows) {
|
||||
if (!deviceMap.has(periph.device_id)) {
|
||||
deviceMap.set(periph.device_id, []);
|
||||
}
|
||||
deviceMap.get(periph.device_id).push({
|
||||
periphNum: periph.peripheral_number,
|
||||
periphID: periph.id,
|
||||
pos: newPos
|
||||
});
|
||||
}
|
||||
|
||||
// Update database for all peripherals
|
||||
for (const periph of peripheralRows) {
|
||||
await sharedPgPool.query(
|
||||
"UPDATE peripherals SET last_pos=$1, last_set=$2 WHERE id=$3",
|
||||
[newPos, dateTime, periph.id]
|
||||
);
|
||||
}
|
||||
|
||||
// Send socket events to each device
|
||||
for (const [deviceId, changedPosList] of deviceMap.entries()) {
|
||||
const {rows: deviceRows} = await sharedPgPool.query(
|
||||
"SELECT socket FROM device_tokens WHERE device_id=$1 AND connected=TRUE",
|
||||
[deviceId]
|
||||
);
|
||||
|
||||
if (deviceRows.length === 1 && deviceRows[0].socket) {
|
||||
const posWithoutID = changedPosList.map(pos => {
|
||||
const { periphID, ...rest } = pos;
|
||||
return rest;
|
||||
});
|
||||
socketIoInstance.to(deviceRows[0].socket).emit("posUpdates", posWithoutID);
|
||||
}
|
||||
}
|
||||
|
||||
// Notify user app
|
||||
const {rows: userRows} = await sharedPgPool.query(
|
||||
"SELECT socket FROM user_tokens WHERE user_id=$1",
|
||||
[userID]
|
||||
);
|
||||
|
||||
if (userRows.length === 1 && userRows[0] && userRows[0].socket) {
|
||||
const posWithoutNumber = peripheralRows.map(p => ({
|
||||
periphID: p.id,
|
||||
pos: newPos
|
||||
}));
|
||||
socketIoInstance.to(userRows[0].socket).emit("posUpdates", posWithoutNumber);
|
||||
}
|
||||
|
||||
} catch (error) {
|
||||
console.error(`Error processing group schedule job:`, error);
|
||||
throw error;
|
||||
}
|
||||
});
|
||||
|
||||
agenda.on('ready', () => console.log('Agenda connected to MongoDB and ready!'));
|
||||
agenda.on('start', (job) => console.log(`Job "${job.attrs.name}" starting`));
|
||||
agenda.on('complete', (job) => console.log(`Job "${job.attrs.name}" complete`));
|
||||
|
||||
Reference in New Issue
Block a user