2025-06-28 19:11:25 -05:00
// agenda.js (modified to receive the already created pool)
const Agenda = require ( 'agenda' ) ;
let agenda ;
2025-07-10 18:31:11 -05:00
let socketIoInstance ;
2025-06-28 19:11:25 -05:00
let sharedPgPool ; // This will hold the pool instance passed from server.js
2025-07-10 18:31:11 -05:00
const initializeAgenda = async ( mongoUri , pool , io ) => { // Now accepts pgPool
socketIoInstance = io ;
2025-06-28 19:11:25 -05:00
sharedPgPool = pool ; // Store the passed pool
agenda = new Agenda ( {
db : {
address : mongoUri || 'mongodb://localhost:27017/myScheduledApp' ,
collection : 'agendaJobs' ,
}
} ) ;
2025-07-10 18:31:11 -05:00
agenda . define ( 'calib' , async ( job ) => {
const { periphID , userID } = job . attrs . data ;
2025-06-28 19:11:25 -05:00
try {
2025-07-10 18:31:11 -05:00
const result = await sharedPgPool . query ( "update peripherals set await_calib=TRUE, calibrated=FALSE where id=$1 and user_id=$2 returning device_id, peripheral_number, id" ,
[ periphID , userID ]
) ;
if ( result . rowCount != 1 ) throw new Error ( "No such peripheral in database" ) ;
2025-06-28 19:11:25 -05:00
2025-07-10 18:31:11 -05:00
const { rows } = await sharedPgPool . query ( "select socket from device_tokens where device_id=$1 and connected=TRUE" , [ result . rows [ 0 ] . device _id ] ) ;
2025-12-28 14:06:52 -06:00
if ( rows . length != 1 ) {
console . log ( "No device with that ID connected to Socket." ) ;
// Notify user that device is not connected
const { rows : userRows } = await sharedPgPool . query ( "select socket from user_tokens where user_id=$1" , [ userID ] ) ;
if ( userRows . length == 1 && userRows [ 0 ] ) {
socketIoInstance . to ( userRows [ 0 ] . socket ) . emit ( "calib_error" , {
periphID : result . rows [ 0 ] . id ,
message : "Device not connected"
} ) ;
}
// Reset await_calib since calibration cannot proceed
await sharedPgPool . query ( "update peripherals set await_calib=FALSE where id=$1" , [ periphID ] ) ;
}
2025-07-10 18:31:11 -05:00
else {
const socket = rows [ 0 ] . socket ;
if ( socket ) {
2025-12-28 14:06:52 -06:00
socketIoInstance . to ( socket ) . emit ( "calib_start" , { port : result . rows [ 0 ] . peripheral _number } ) ;
2025-07-10 18:31:11 -05:00
}
}
2025-06-28 19:11:25 -05:00
2025-07-10 18:31:11 -05:00
const { rows : userRows } = await sharedPgPool . query ( "select socket from user_tokens where user_id=$1" , [ userID ] ) ;
2025-06-28 19:11:25 -05:00
2025-07-10 18:31:11 -05:00
if ( userRows . length == 1 ) {
if ( userRows [ 0 ] ) {
const userSocket = userRows [ 0 ] . socket ;
socketIoInstance . to ( userSocket ) . emit ( "calib" , { periphID : result . rows [ 0 ] . id } ) ;
}
}
else console . log ( "No App connected" ) ;
} catch ( error ) {
console . error ( ` Error processing job: ` , error ) ;
throw error ;
}
} ) ;
agenda . define ( 'cancel_calib' , async ( job ) => {
const { periphID , userID } = job . attrs . data ;
try {
const result = await sharedPgPool . query ( "update peripherals set await_calib=FALSE where id=$1 and user_id=$2 returning device_id, peripheral_number" ,
[ periphID , userID ]
) ;
if ( result . rowCount != 1 ) throw new Error ( "No such peripheral in database" ) ;
const { rows } = await sharedPgPool . query ( "select socket from device_tokens where device_id=$1 and connected=TRUE" , [ result . rows [ 0 ] . device _id ] ) ;
if ( rows . length != 1 ) console . log ( "No device with that ID connected to Socket." ) ;
else {
const socket = rows [ 0 ] . socket ;
if ( socket ) {
2025-12-28 14:06:52 -06:00
socketIoInstance . to ( socket ) . emit ( "cancel_calib" , { port : result . rows [ 0 ] . peripheral _number } ) ;
2025-07-10 18:31:11 -05:00
}
}
} catch ( error ) {
console . error ( ` Error processing job: ` , error ) ;
throw error ;
}
} )
agenda . define ( 'posChange' , async ( job ) => {
const { deviceID , changedPosList , userID } = job . attrs . data ;
// changedPosList is of the structure [{periphNum, periphID, pos}]
const dateTime = new Date ( ) ;
if ( ! changedPosList ) console . log ( "undefined list" ) ;
try {
const posWithoutID = changedPosList . map ( pos => {
const { periphID , ... rest } = pos ;
return rest ;
} ) ;
2025-12-23 17:22:33 -06:00
// const posWithoutNumber = changedPosList.map(pos => {
// const { periphNum, ...rest } = pos;
// return rest;
// });
2025-07-10 18:31:11 -05:00
for ( const pos of changedPosList ) {
const result = await sharedPgPool . query ( "update peripherals set last_pos=$1, last_set=$2 where id=$3 and user_id=$4" ,
[ pos . pos , dateTime , pos . periphID , userID ]
) ;
if ( result . rowCount != 1 ) throw new Error ( "No such peripheral in database" ) ;
}
const { rows } = await sharedPgPool . query ( "select socket from device_tokens where device_id=$1 and connected=TRUE" , [ deviceID ] ) ;
if ( rows . length != 1 ) console . log ( "No device with that ID connected to Socket." ) ;
else {
const socket = rows [ 0 ] . socket ;
if ( socket ) {
socketIoInstance . to ( socket ) . emit ( "posUpdates" , posWithoutID ) ;
}
}
} catch ( error ) {
console . error ( ` Error processing job: ` , error ) ;
throw error ;
}
} ) ;
agenda . define ( 'posChangeScheduled' , async ( job ) => {
const { deviceID , changedPosList , userID } = job . attrs . data ;
// changedPosList is of the structure [{periphNum, periphID, pos}]
const dateTime = new Date ( ) ;
if ( ! changedPosList ) console . log ( "undefined list" ) ;
try {
const posWithoutID = changedPosList . map ( pos => {
const { periphID , ... rest } = pos ;
return rest ;
} ) ;
const posWithoutNumber = changedPosList . map ( pos => {
const { periphNum , ... rest } = pos ;
return rest ;
} ) ;
for ( const pos of changedPosList ) {
const result = await sharedPgPool . query ( "update peripherals set last_pos=$1, last_set=$2 where id=$3 and user_id=$4" ,
[ pos . pos , dateTime , pos . periphID , userID ]
) ;
if ( result . rowCount != 1 ) throw new Error ( "No such peripheral in database" ) ;
}
const { rows } = await sharedPgPool . query ( "select socket from device_tokens where device_id=$1 and connected=TRUE" , [ deviceID ] ) ;
if ( rows . length != 1 ) console . log ( "No device with that ID connected to Socket." ) ;
else {
const socket = rows [ 0 ] . socket ;
if ( socket ) {
socketIoInstance . to ( socket ) . emit ( "posUpdates" , posWithoutID ) ;
}
}
const { rows : userRows } = await sharedPgPool . query ( "select socket from user_tokens where user_id=$1" , [ userID ] ) ;
2025-06-28 19:11:25 -05:00
2025-07-10 18:31:11 -05:00
if ( userRows . length == 1 ) {
if ( userRows [ 0 ] ) {
const userSocket = userRows [ 0 ] . socket ;
socketIoInstance . to ( userSocket ) . emit ( "posUpdates" , posWithoutNumber ) ;
}
}
else console . log ( "No App connected" ) ;
2025-06-28 19:11:25 -05:00
} catch ( error ) {
2025-07-10 18:31:11 -05:00
console . error ( ` Error processing job: ` , error ) ;
2025-06-28 19:11:25 -05:00
throw error ;
}
} ) ;
2026-01-02 22:27:27 -06:00
agenda . define ( 'groupPosChangeScheduled' , async ( job ) => {
const { groupID , newPos , userID } = job . attrs . data ;
const dateTime = new Date ( ) ;
try {
// Query current group members at execution time
const { rows : peripheralRows } = await sharedPgPool . query (
` SELECT p.id, p.peripheral_number, p.device_id
FROM peripherals p
JOIN group _peripherals gp ON p . id = gp . peripheral _id
JOIN groups g ON gp . group _id = g . id
WHERE g . id = $1 AND g . user _id = $2 ` ,
[ groupID , userID ]
) ;
if ( peripheralRows . length === 0 ) {
console . log ( ` No peripherals found in group ${ groupID } ` ) ;
return ;
}
// Group peripherals by device_id
const deviceMap = new Map ( ) ;
for ( const periph of peripheralRows ) {
if ( ! deviceMap . has ( periph . device _id ) ) {
deviceMap . set ( periph . device _id , [ ] ) ;
}
deviceMap . get ( periph . device _id ) . push ( {
periphNum : periph . peripheral _number ,
periphID : periph . id ,
pos : newPos
} ) ;
}
// Update database for all peripherals
for ( const periph of peripheralRows ) {
await sharedPgPool . query (
"UPDATE peripherals SET last_pos=$1, last_set=$2 WHERE id=$3" ,
[ newPos , dateTime , periph . id ]
) ;
}
// Send socket events to each device
for ( const [ deviceId , changedPosList ] of deviceMap . entries ( ) ) {
const { rows : deviceRows } = await sharedPgPool . query (
"SELECT socket FROM device_tokens WHERE device_id=$1 AND connected=TRUE" ,
[ deviceId ]
) ;
if ( deviceRows . length === 1 && deviceRows [ 0 ] . socket ) {
const posWithoutID = changedPosList . map ( pos => {
const { periphID , ... rest } = pos ;
return rest ;
} ) ;
socketIoInstance . to ( deviceRows [ 0 ] . socket ) . emit ( "posUpdates" , posWithoutID ) ;
}
}
// Notify user app
const { rows : userRows } = await sharedPgPool . query (
"SELECT socket FROM user_tokens WHERE user_id=$1" ,
[ userID ]
) ;
if ( userRows . length === 1 && userRows [ 0 ] && userRows [ 0 ] . socket ) {
const posWithoutNumber = peripheralRows . map ( p => ( {
periphID : p . id ,
pos : newPos
} ) ) ;
socketIoInstance . to ( userRows [ 0 ] . socket ) . emit ( "posUpdates" , posWithoutNumber ) ;
}
} catch ( error ) {
console . error ( ` Error processing group schedule job: ` , error ) ;
throw error ;
}
} ) ;
2026-01-08 08:23:15 -06:00
agenda . define ( 'delete unverified users' , async ( job ) => {
try {
const result = await sharedPgPool . query (
"DELETE FROM users WHERE is_verified = false AND created_at < NOW() - INTERVAL '24 hours'"
) ;
if ( result . rowCount > 0 ) {
console . log ( ` Cleanup: Deleted ${ result . rowCount } unverified users. ` ) ;
}
} catch ( error ) {
console . error ( "Error cleaning up users:" , error ) ;
}
} ) ;
2026-01-08 13:06:20 -06:00
// Define the password reset token deletion job
agenda . define ( 'deletePasswordResetToken' , async ( job ) => {
const { email } = job . attrs . data ;
try {
const result = await pool . query ( 'DELETE FROM password_reset_tokens WHERE email = $1' , [ email ] ) ;
if ( result . rowCount > 0 ) {
console . log ( ` Deleted expired password reset token for ${ email } ` ) ;
}
} catch ( error ) {
console . error ( ` Error deleting password reset token for ${ email } : ` , error ) ;
}
} ) ;
await agenda . start ( ) ;
2026-01-08 08:23:15 -06:00
2025-06-28 19:11:25 -05:00
agenda . on ( 'ready' , ( ) => console . log ( 'Agenda connected to MongoDB and ready!' ) ) ;
agenda . on ( 'start' , ( job ) => console . log ( ` Job " ${ job . attrs . name } " starting ` ) ) ;
agenda . on ( 'complete' , ( job ) => console . log ( ` Job " ${ job . attrs . name } " complete ` ) ) ;
agenda . on ( 'success' , ( job ) => console . log ( ` Job " ${ job . attrs . name } " succeeded ` ) ) ;
agenda . on ( 'fail' , ( err , job ) => console . error ( ` Job " ${ job . attrs . name } " failed: ${ err . message } ` ) ) ;
await agenda . start ( ) ;
2026-01-08 08:23:15 -06:00
await agenda . cancel ( { name : 'delete unverified users' } ) ;
await agenda . every ( '24 hours' , 'delete unverified users' ) ;
2025-06-28 19:11:25 -05:00
console . log ( 'Agenda job processing started.' ) ;
2025-07-10 18:31:11 -05:00
return agenda ;
2025-06-28 19:11:25 -05:00
} ;
module . exports = {
agenda ,
initializeAgenda
} ;