2025-06-28 19:11:25 -05:00
// agenda.js (modified to receive the already created pool)
const Agenda = require ( 'agenda' ) ;
let agenda ;
2025-07-10 18:31:11 -05:00
let socketIoInstance ;
2025-06-28 19:11:25 -05:00
let sharedPgPool ; // This will hold the pool instance passed from server.js
2025-07-10 18:31:11 -05:00
const initializeAgenda = async ( mongoUri , pool , io ) => { // Now accepts pgPool
socketIoInstance = io ;
2025-06-28 19:11:25 -05:00
sharedPgPool = pool ; // Store the passed pool
agenda = new Agenda ( {
db : {
address : mongoUri || 'mongodb://localhost:27017/myScheduledApp' ,
collection : 'agendaJobs' ,
}
} ) ;
2025-07-10 18:31:11 -05:00
agenda . define ( 'calib' , async ( job ) => {
const { periphID , userID } = job . attrs . data ;
2025-06-28 19:11:25 -05:00
try {
2025-07-10 18:31:11 -05:00
const result = await sharedPgPool . query ( "update peripherals set await_calib=TRUE, calibrated=FALSE where id=$1 and user_id=$2 returning device_id, peripheral_number, id" ,
[ periphID , userID ]
) ;
if ( result . rowCount != 1 ) throw new Error ( "No such peripheral in database" ) ;
2025-06-28 19:11:25 -05:00
2025-07-10 18:31:11 -05:00
const { rows } = await sharedPgPool . query ( "select socket from device_tokens where device_id=$1 and connected=TRUE" , [ result . rows [ 0 ] . device _id ] ) ;
if ( rows . length != 1 ) console . log ( "No device with that ID connected to Socket." ) ;
else {
const socket = rows [ 0 ] . socket ;
if ( socket ) {
socketIoInstance . to ( socket ) . emit ( "calib" , { periphNum : result . rows [ 0 ] . peripheral _number } ) ;
}
}
2025-06-28 19:11:25 -05:00
2025-07-10 18:31:11 -05:00
const { rows : userRows } = await sharedPgPool . query ( "select socket from user_tokens where user_id=$1" , [ userID ] ) ;
2025-06-28 19:11:25 -05:00
2025-07-10 18:31:11 -05:00
if ( userRows . length == 1 ) {
if ( userRows [ 0 ] ) {
const userSocket = userRows [ 0 ] . socket ;
socketIoInstance . to ( userSocket ) . emit ( "calib" , { periphID : result . rows [ 0 ] . id } ) ;
}
}
else console . log ( "No App connected" ) ;
} catch ( error ) {
console . error ( ` Error processing job: ` , error ) ;
throw error ;
}
} ) ;
agenda . define ( 'cancel_calib' , async ( job ) => {
const { periphID , userID } = job . attrs . data ;
try {
const result = await sharedPgPool . query ( "update peripherals set await_calib=FALSE where id=$1 and user_id=$2 returning device_id, peripheral_number" ,
[ periphID , userID ]
) ;
if ( result . rowCount != 1 ) throw new Error ( "No such peripheral in database" ) ;
const { rows } = await sharedPgPool . query ( "select socket from device_tokens where device_id=$1 and connected=TRUE" , [ result . rows [ 0 ] . device _id ] ) ;
if ( rows . length != 1 ) console . log ( "No device with that ID connected to Socket." ) ;
else {
const socket = rows [ 0 ] . socket ;
if ( socket ) {
socketIoInstance . to ( socket ) . emit ( "cancel_calib" , { periphNum : result . rows [ 0 ] . peripheral _number } ) ;
}
}
} catch ( error ) {
console . error ( ` Error processing job: ` , error ) ;
throw error ;
}
} )
agenda . define ( 'posChange' , async ( job ) => {
const { deviceID , changedPosList , userID } = job . attrs . data ;
// changedPosList is of the structure [{periphNum, periphID, pos}]
const dateTime = new Date ( ) ;
if ( ! changedPosList ) console . log ( "undefined list" ) ;
try {
const posWithoutID = changedPosList . map ( pos => {
const { periphID , ... rest } = pos ;
return rest ;
} ) ;
2025-12-23 17:22:33 -06:00
// const posWithoutNumber = changedPosList.map(pos => {
// const { periphNum, ...rest } = pos;
// return rest;
// });
2025-07-10 18:31:11 -05:00
for ( const pos of changedPosList ) {
const result = await sharedPgPool . query ( "update peripherals set last_pos=$1, last_set=$2 where id=$3 and user_id=$4" ,
[ pos . pos , dateTime , pos . periphID , userID ]
) ;
if ( result . rowCount != 1 ) throw new Error ( "No such peripheral in database" ) ;
}
const { rows } = await sharedPgPool . query ( "select socket from device_tokens where device_id=$1 and connected=TRUE" , [ deviceID ] ) ;
if ( rows . length != 1 ) console . log ( "No device with that ID connected to Socket." ) ;
else {
const socket = rows [ 0 ] . socket ;
if ( socket ) {
socketIoInstance . to ( socket ) . emit ( "posUpdates" , posWithoutID ) ;
}
}
} catch ( error ) {
console . error ( ` Error processing job: ` , error ) ;
throw error ;
}
} ) ;
agenda . define ( 'posChangeScheduled' , async ( job ) => {
const { deviceID , changedPosList , userID } = job . attrs . data ;
// changedPosList is of the structure [{periphNum, periphID, pos}]
const dateTime = new Date ( ) ;
if ( ! changedPosList ) console . log ( "undefined list" ) ;
try {
const posWithoutID = changedPosList . map ( pos => {
const { periphID , ... rest } = pos ;
return rest ;
} ) ;
const posWithoutNumber = changedPosList . map ( pos => {
const { periphNum , ... rest } = pos ;
return rest ;
} ) ;
for ( const pos of changedPosList ) {
const result = await sharedPgPool . query ( "update peripherals set last_pos=$1, last_set=$2 where id=$3 and user_id=$4" ,
[ pos . pos , dateTime , pos . periphID , userID ]
) ;
if ( result . rowCount != 1 ) throw new Error ( "No such peripheral in database" ) ;
}
const { rows } = await sharedPgPool . query ( "select socket from device_tokens where device_id=$1 and connected=TRUE" , [ deviceID ] ) ;
if ( rows . length != 1 ) console . log ( "No device with that ID connected to Socket." ) ;
else {
const socket = rows [ 0 ] . socket ;
if ( socket ) {
socketIoInstance . to ( socket ) . emit ( "posUpdates" , posWithoutID ) ;
}
}
const { rows : userRows } = await sharedPgPool . query ( "select socket from user_tokens where user_id=$1" , [ userID ] ) ;
2025-06-28 19:11:25 -05:00
2025-07-10 18:31:11 -05:00
if ( userRows . length == 1 ) {
if ( userRows [ 0 ] ) {
const userSocket = userRows [ 0 ] . socket ;
socketIoInstance . to ( userSocket ) . emit ( "posUpdates" , posWithoutNumber ) ;
}
}
else console . log ( "No App connected" ) ;
2025-06-28 19:11:25 -05:00
} catch ( error ) {
2025-07-10 18:31:11 -05:00
console . error ( ` Error processing job: ` , error ) ;
2025-06-28 19:11:25 -05:00
throw error ;
}
} ) ;
agenda . on ( 'ready' , ( ) => console . log ( 'Agenda connected to MongoDB and ready!' ) ) ;
agenda . on ( 'start' , ( job ) => console . log ( ` Job " ${ job . attrs . name } " starting ` ) ) ;
agenda . on ( 'complete' , ( job ) => console . log ( ` Job " ${ job . attrs . name } " complete ` ) ) ;
agenda . on ( 'success' , ( job ) => console . log ( ` Job " ${ job . attrs . name } " succeeded ` ) ) ;
agenda . on ( 'fail' , ( err , job ) => console . error ( ` Job " ${ job . attrs . name } " failed: ${ err . message } ` ) ) ;
await agenda . start ( ) ;
console . log ( 'Agenda job processing started.' ) ;
2025-07-10 18:31:11 -05:00
return agenda ;
2025-06-28 19:11:25 -05:00
} ;
module . exports = {
agenda ,
initializeAgenda
} ;