304 lines
11 KiB
JavaScript
304 lines
11 KiB
JavaScript
// agenda.js (modified to receive the already created pool)
|
|
const Agenda = require('agenda');
|
|
|
|
let agenda;
|
|
let socketIoInstance;
|
|
let sharedPgPool; // This will hold the pool instance passed from server.js
|
|
|
|
const initializeAgenda = async (mongoUri, pool, io) => { // Now accepts pgPool
|
|
socketIoInstance = io;
|
|
sharedPgPool = pool; // Store the passed pool
|
|
|
|
agenda = new Agenda({
|
|
db: {
|
|
address: mongoUri || 'mongodb://localhost:27017/myScheduledApp',
|
|
collection: 'agendaJobs',
|
|
}
|
|
});
|
|
|
|
agenda.define('calib', async (job) => {
|
|
const {periphID, userID } = job.attrs.data;
|
|
try {
|
|
const result = await sharedPgPool.query("update peripherals set await_calib=FALSE, calibrated=FALSE where id=$1 and user_id=$2 returning device_id, peripheral_number, id",
|
|
[periphID, userID]
|
|
);
|
|
if (result.rowCount != 1) throw new Error("No such peripheral in database");
|
|
|
|
const {rows} = await sharedPgPool.query("select socket from device_tokens where device_id=$1 and connected=TRUE", [result.rows[0].device_id]);
|
|
if (rows.length != 1) {
|
|
console.log("No device with that ID connected to Socket.");
|
|
// Notify user that device is not connected
|
|
const {rows: userRows} = await sharedPgPool.query("select socket from user_tokens where user_id=$1", [userID]);
|
|
if (userRows.length == 1 && userRows[0]) {
|
|
socketIoInstance.to(userRows[0].socket).emit("calib_error", {
|
|
periphID: result.rows[0].id,
|
|
message: "Device not connected"
|
|
});
|
|
}
|
|
// Reset await_calib since calibration cannot proceed
|
|
await sharedPgPool.query("update peripherals set await_calib=FALSE where id=$1", [periphID]);
|
|
}
|
|
else {
|
|
const socket = rows[0].socket;
|
|
if (socket) {
|
|
socketIoInstance.to(socket).emit("calib_start", {port: result.rows[0].peripheral_number});
|
|
}
|
|
}
|
|
|
|
const {rows: userRows} = await sharedPgPool.query("select socket from user_tokens where user_id=$1", [userID]);
|
|
|
|
if (userRows.length == 1) {
|
|
if (userRows[0]){
|
|
const userSocket = userRows[0].socket;
|
|
socketIoInstance.to(userSocket).emit("calib", {periphID: result.rows[0].id});
|
|
}
|
|
}
|
|
else console.log("No App connected");
|
|
} catch (error) {
|
|
console.error(`Error processing job:`, error);
|
|
throw error;
|
|
}
|
|
});
|
|
|
|
agenda.define('cancel_calib', async (job) => {
|
|
const {periphID, userID } = job.attrs.data;
|
|
try {
|
|
const result = await sharedPgPool.query("update peripherals set await_calib=FALSE where id=$1 and user_id=$2 returning device_id, peripheral_number",
|
|
[periphID, userID]
|
|
);
|
|
if (result.rowCount != 1) throw new Error("No such peripheral in database");
|
|
|
|
const {rows} = await sharedPgPool.query("select socket from device_tokens where device_id=$1 and connected=TRUE", [result.rows[0].device_id]);
|
|
if (rows.length != 1) console.log("No device with that ID connected to Socket.");
|
|
else {
|
|
const socket = rows[0].socket;
|
|
if (socket) {
|
|
socketIoInstance.to(socket).emit("cancel_calib", {port: result.rows[0].peripheral_number});
|
|
}
|
|
}
|
|
} catch (error) {
|
|
console.error(`Error processing job:`, error);
|
|
throw error;
|
|
}
|
|
})
|
|
|
|
agenda.define('posChange', async (job) => {
|
|
const { deviceID, changedPosList, userID } = job.attrs.data;
|
|
// changedPosList is of the structure [{periphNum, periphID, pos}]
|
|
const dateTime = new Date();
|
|
if (!changedPosList) console.log("undefined list");
|
|
try {
|
|
const posWithoutID = changedPosList.map(pos => {
|
|
const { periphID, ...rest } = pos;
|
|
return rest;
|
|
});
|
|
|
|
// const posWithoutNumber = changedPosList.map(pos => {
|
|
// const { periphNum, ...rest } = pos;
|
|
// return rest;
|
|
// });
|
|
|
|
for (const pos of changedPosList) {
|
|
const result = await sharedPgPool.query("update peripherals set last_pos=$1, last_set=$2 where id=$3 and user_id=$4",
|
|
[pos.pos, dateTime, pos.periphID, userID]
|
|
);
|
|
if (result.rowCount != 1) throw new Error("No such peripheral in database");
|
|
}
|
|
|
|
const {rows} = await sharedPgPool.query("select socket from device_tokens where device_id=$1 and connected=TRUE", [deviceID]);
|
|
if (rows.length != 1) console.log("No device with that ID connected to Socket.");
|
|
else {
|
|
const socket = rows[0].socket;
|
|
if (socket) {
|
|
socketIoInstance.to(socket).emit("posUpdates", posWithoutID);
|
|
}
|
|
}
|
|
|
|
} catch (error) {
|
|
console.error(`Error processing job:`, error);
|
|
throw error;
|
|
}
|
|
});
|
|
|
|
agenda.define('posChangeScheduled', async (job) => {
|
|
const { deviceID, changedPosList, userID } = job.attrs.data;
|
|
// changedPosList is of the structure [{periphNum, periphID, pos}]
|
|
const dateTime = new Date();
|
|
if (!changedPosList) console.log("undefined list");
|
|
try {
|
|
const posWithoutID = changedPosList.map(pos => {
|
|
const { periphID, ...rest } = pos;
|
|
return rest;
|
|
});
|
|
|
|
const posWithoutNumber = changedPosList.map(pos => {
|
|
const { periphNum, ...rest } = pos;
|
|
return rest;
|
|
});
|
|
|
|
for (const pos of changedPosList) {
|
|
const result = await sharedPgPool.query("update peripherals set last_pos=$1, last_set=$2 where id=$3 and user_id=$4",
|
|
[pos.pos, dateTime, pos.periphID, userID]
|
|
);
|
|
if (result.rowCount != 1) throw new Error("No such peripheral in database");
|
|
}
|
|
|
|
const {rows} = await sharedPgPool.query("select socket from device_tokens where device_id=$1 and connected=TRUE", [deviceID]);
|
|
if (rows.length != 1) console.log("No device with that ID connected to Socket.");
|
|
else {
|
|
const socket = rows[0].socket;
|
|
if (socket) {
|
|
socketIoInstance.to(socket).emit("posUpdates", posWithoutID);
|
|
}
|
|
}
|
|
|
|
const {rows: userRows} = await sharedPgPool.query("select socket from user_tokens where user_id=$1", [userID]);
|
|
|
|
if (userRows.length == 1) {
|
|
if (userRows[0]){
|
|
const userSocket = userRows[0].socket;
|
|
socketIoInstance.to(userSocket).emit("posUpdates", posWithoutNumber);
|
|
}
|
|
}
|
|
else console.log("No App connected");
|
|
|
|
} catch (error) {
|
|
console.error(`Error processing job:`, error);
|
|
throw error;
|
|
}
|
|
});
|
|
|
|
agenda.define('groupPosChangeScheduled', async (job) => {
|
|
const { groupID, newPos, userID } = job.attrs.data;
|
|
const dateTime = new Date();
|
|
try {
|
|
// Query current group members at execution time
|
|
const {rows: peripheralRows} = await sharedPgPool.query(
|
|
`SELECT p.id, p.peripheral_number, p.device_id
|
|
FROM peripherals p
|
|
JOIN group_peripherals gp ON p.id = gp.peripheral_id
|
|
JOIN groups g ON gp.group_id = g.id
|
|
WHERE g.id = $1 AND g.user_id = $2`,
|
|
[groupID, userID]
|
|
);
|
|
|
|
if (peripheralRows.length === 0) {
|
|
console.log(`No peripherals found in group ${groupID}`);
|
|
return;
|
|
}
|
|
|
|
// Group peripherals by device_id
|
|
const deviceMap = new Map();
|
|
for (const periph of peripheralRows) {
|
|
if (!deviceMap.has(periph.device_id)) {
|
|
deviceMap.set(periph.device_id, []);
|
|
}
|
|
deviceMap.get(periph.device_id).push({
|
|
periphNum: periph.peripheral_number,
|
|
periphID: periph.id,
|
|
pos: newPos
|
|
});
|
|
}
|
|
|
|
// Update database for all peripherals
|
|
for (const periph of peripheralRows) {
|
|
await sharedPgPool.query(
|
|
"UPDATE peripherals SET last_pos=$1, last_set=$2 WHERE id=$3",
|
|
[newPos, dateTime, periph.id]
|
|
);
|
|
}
|
|
|
|
// Send socket events to each device
|
|
for (const [deviceId, changedPosList] of deviceMap.entries()) {
|
|
const {rows: deviceRows} = await sharedPgPool.query(
|
|
"SELECT socket FROM device_tokens WHERE device_id=$1 AND connected=TRUE",
|
|
[deviceId]
|
|
);
|
|
|
|
if (deviceRows.length === 1 && deviceRows[0].socket) {
|
|
const posWithoutID = changedPosList.map(pos => {
|
|
const { periphID, ...rest } = pos;
|
|
return rest;
|
|
});
|
|
socketIoInstance.to(deviceRows[0].socket).emit("posUpdates", posWithoutID);
|
|
}
|
|
}
|
|
|
|
// Notify user app
|
|
const {rows: userRows} = await sharedPgPool.query(
|
|
"SELECT socket FROM user_tokens WHERE user_id=$1",
|
|
[userID]
|
|
);
|
|
|
|
if (userRows.length === 1 && userRows[0] && userRows[0].socket) {
|
|
const posWithoutNumber = peripheralRows.map(p => ({
|
|
periphID: p.id,
|
|
pos: newPos
|
|
}));
|
|
socketIoInstance.to(userRows[0].socket).emit("posUpdates", posWithoutNumber);
|
|
}
|
|
|
|
} catch (error) {
|
|
console.error(`Error processing group schedule job:`, error);
|
|
throw error;
|
|
}
|
|
});
|
|
|
|
agenda.define('deleteUnverifiedUser', async (job) => {
|
|
const { userId } = job.attrs.data;
|
|
try {
|
|
const result = await sharedPgPool.query(
|
|
"DELETE FROM users WHERE id = $1 AND is_verified = false",
|
|
[userId]
|
|
);
|
|
if (result.rowCount > 0) {
|
|
console.log(`Deleted unverified user with ID ${userId}`);
|
|
}
|
|
} catch (error) {
|
|
console.error(`Error deleting unverified user ${userId}:`, error);
|
|
}
|
|
});
|
|
|
|
// Define the password reset token deletion job
|
|
agenda.define('deletePasswordResetToken', async (job) => {
|
|
const { email } = job.attrs.data;
|
|
try {
|
|
const result = await pool.query('DELETE FROM password_reset_tokens WHERE email = $1', [email]);
|
|
if (result.rowCount > 0) {
|
|
console.log(`Deleted expired password reset token for ${email}`);
|
|
}
|
|
} catch (error) {
|
|
console.error(`Error deleting password reset token for ${email}:`, error);
|
|
}
|
|
});
|
|
|
|
// Define the user pending email deletion job
|
|
agenda.define('deleteUserPendingEmail', async (job) => {
|
|
const { userId } = job.attrs.data;
|
|
try {
|
|
const result = await sharedPgPool.query('DELETE FROM user_pending_emails WHERE user_id = $1', [userId]);
|
|
if (result.rowCount > 0) {
|
|
console.log(`Deleted expired pending email change for user ${userId}`);
|
|
}
|
|
} catch (error) {
|
|
console.error(`Error deleting pending email for user ${userId}:`, error);
|
|
}
|
|
});
|
|
|
|
await agenda.start();
|
|
|
|
agenda.on('ready', () => console.log('Agenda connected to MongoDB and ready!'));
|
|
agenda.on('start', (job) => console.log(`Job "${job.attrs.name}" starting`));
|
|
agenda.on('complete', (job) => console.log(`Job "${job.attrs.name}" complete`));
|
|
agenda.on('success', (job) => console.log(`Job "${job.attrs.name}" succeeded`));
|
|
agenda.on('fail', (err, job) => console.error(`Job "${job.attrs.name}" failed: ${err.message}`));
|
|
|
|
await agenda.start();
|
|
console.log('Agenda job processing started.');
|
|
return agenda;
|
|
};
|
|
|
|
module.exports = {
|
|
agenda,
|
|
initializeAgenda
|
|
}; |