2025-06-28 19:11:25 -05:00
const { verify , hash } = require ( 'argon2' ) ;
const express = require ( 'express' ) ;
const { json } = require ( 'express' ) ;
const { Pool } = require ( 'pg' ) ;
require ( 'dotenv' ) . config ( ) ;
const jwt = require ( 'jsonwebtoken' ) ;
const http = require ( 'http' ) ;
const socketIo = require ( 'socket.io' ) ;
2025-12-31 17:10:14 -06:00
const connectDB = require ( './db' ) ;
2025-07-10 18:31:11 -05:00
const { initializeAgenda } = require ( './agenda' ) ; // Agenda setup
const format = require ( 'pg-format' ) ;
2025-12-23 17:22:33 -06:00
const cronParser = require ( 'cron-parser' ) ;
2026-01-01 12:55:34 -06:00
const { ObjectId } = require ( 'mongodb' ) ;
2025-06-28 19:11:25 -05:00
const app = express ( ) ;
const port = 3000 ;
app . use ( json ( ) ) ;
const pool = new Pool ( ) ;
const server = http . createServer ( app ) ;
2025-07-10 18:31:11 -05:00
const io = socketIo ( server , {
pingInterval : 15000 ,
pingTimeout : 10000 ,
} ) ;
let agenda ;
( async ( ) => {
2025-12-31 17:10:14 -06:00
// 1. Connect to MongoDB
2025-07-10 18:31:11 -05:00
await connectDB ( ) ;
agenda = await initializeAgenda ( 'mongodb://localhost:27017/myScheduledApp' , pool , io ) ;
} ) ( ) ;
2025-06-28 19:11:25 -05:00
2025-07-10 18:31:11 -05:00
( async ( ) => {
await pool . query ( "update user_tokens set connected=FALSE where connected=TRUE" ) ;
await pool . query ( "update device_tokens set connected=FALSE where connected=TRUE" ) ;
} ) ( ) ;
2025-06-28 19:11:25 -05:00
const JWT _SECRET = process . env . JWT _SECRET ;
const TOKEN _EXPIRY = '5d' ;
2025-07-10 18:31:11 -05:00
io . on ( 'connection' , async ( socket ) => {
2025-06-28 19:11:25 -05:00
console . log ( "periph connected" ) ;
2025-07-10 18:31:11 -05:00
const token = socket . handshake . auth . token ? ? socket . handshake . headers [ 'authorization' ] ? . split ( ' ' ) [ 1 ] ;
try {
if ( ! token ) throw new Error ( "no token!" ) ;
const payload = jwt . verify ( token , JWT _SECRET ) ;
let table ;
let idCol ;
let id ;
if ( payload . type === "user" ) {
table = "user_tokens" ;
idCol = "user_id" ;
socket . user = true ;
id = payload . userId ;
}
else if ( payload . type === "peripheral" ) {
table = "device_tokens" ;
idCol = "device_id" ;
socket . user = false ;
id = payload . peripheralId ;
}
else {
throw new Error ( "Unauthorized" ) ;
}
2025-06-28 19:11:25 -05:00
2025-07-10 18:31:11 -05:00
const query = format ( ` update %I set connected=TRUE, socket= $ 1 where %I= $ 2 and token= $ 3 and connected=FALSE ` ,
table , idCol
) ;
2025-06-28 19:11:25 -05:00
2025-07-10 18:31:11 -05:00
const result = await pool . query ( query , [ socket . id , id , token ] ) ;
if ( result . rowCount != 1 ) {
const errorResponse = {
type : 'error' ,
code : 404 ,
message : 'Device not found or already connected'
} ;
socket . emit ( "error" , errorResponse ) ;
socket . disconnect ( true ) ;
}
else {
2025-12-24 18:39:48 -06:00
console . log ( "success - sending device state" ) ;
// For peripheral devices, send current device state
if ( payload . type === "peripheral" ) {
try {
2025-12-31 17:10:14 -06:00
// Get peripheral states for this device (device will report calibration status back)
2025-12-24 18:39:48 -06:00
const { rows : periphRows } = await pool . query (
2025-12-31 17:10:14 -06:00
"select last_pos, peripheral_number from peripherals where device_id=$1" ,
2025-12-24 18:39:48 -06:00
[ id ]
) ;
const successResponse = {
type : 'success' ,
code : 200 ,
message : 'Device authenticated' ,
deviceState : periphRows . map ( row => ( {
port : row . peripheral _number ,
2025-12-31 17:10:14 -06:00
lastPos : row . last _pos
2025-12-24 18:39:48 -06:00
} ) )
} ;
socket . emit ( "device_init" , successResponse ) ;
2025-12-28 14:06:52 -06:00
// Notify user app that device is now connected
const { rows : deviceUserRows } = await pool . query ( "select user_id from devices where id=$1" , [ id ] ) ;
if ( deviceUserRows . length === 1 ) {
const { rows : userRows } = await pool . query ( "select socket from user_tokens where user_id=$1 and connected=TRUE" , [ deviceUserRows [ 0 ] . user _id ] ) ;
if ( userRows . length === 1 && userRows [ 0 ] ) {
io . to ( userRows [ 0 ] . socket ) . emit ( "device_connected" , { deviceID : id } ) ;
}
}
2025-12-24 18:39:48 -06:00
} catch ( err ) {
console . error ( "Error fetching device state:" , err ) ;
socket . emit ( "device_init" , {
type : 'success' ,
code : 200 ,
message : 'Device authenticated' ,
deviceState : [ ]
} ) ;
}
} else {
// User connection
const successResponse = {
type : 'success' ,
code : 200 ,
message : 'User connected'
} ;
socket . emit ( "success" , successResponse ) ;
}
2025-07-10 18:31:11 -05:00
}
} catch ( error ) {
console . error ( 'Error during periph authentication:' , error ) ;
// Send an error message to the client
socket . emit ( 'error' , { type : 'error' , code : 500 } ) ;
// Disconnect the client
socket . disconnect ( true ) ;
}
2025-12-31 17:10:14 -06:00
// Device reports its calibration status after connection
socket . on ( 'report_calib_status' , async ( data ) => {
console . log ( ` Device reporting calibration status: ${ socket . id } ` ) ;
console . log ( data ) ;
try {
const { rows } = await pool . query ( "select device_id from device_tokens where socket=$1 and connected=TRUE" , [ socket . id ] ) ;
if ( rows . length != 1 ) throw new Error ( "No device with that ID connected to Socket." ) ;
// Update calibration status in database based on device's actual state
if ( data . port && typeof data . calibrated === 'boolean' ) {
await pool . query (
"update peripherals set calibrated=$1 where device_id=$2 and peripheral_number=$3" ,
[ data . calibrated , rows [ 0 ] . device _id , data . port ]
) ;
console . log ( ` Updated port ${ data . port } calibrated status to ${ data . calibrated } ` ) ;
}
} catch ( error ) {
console . error ( ` Error in report_calib_status: ` , error ) ;
}
} ) ;
// Device reports calibration error (motor stall, sensor failure, etc.)
socket . on ( 'device_calib_error' , async ( data ) => {
console . log ( ` Device reporting calibration error: ${ socket . id } ` ) ;
console . log ( data ) ;
try {
const { rows } = await pool . query ( "select device_id from device_tokens where socket=$1 and connected=TRUE" , [ socket . id ] ) ;
if ( rows . length != 1 ) throw new Error ( "No device with that ID connected to Socket." ) ;
const result = await pool . query (
"update peripherals set await_calib=FALSE where device_id=$1 and peripheral_number=$2 returning id, user_id" ,
[ rows [ 0 ] . device _id , data . port ]
) ;
if ( result . rowCount != 1 ) throw new Error ( "No such peripheral in database" ) ;
// Notify user app about the error
const { rows : userRows } = await pool . query ( "select socket from user_tokens where user_id=$1 and connected=TRUE" , [ result . rows [ 0 ] . user _id ] ) ;
if ( userRows . length === 1 && userRows [ 0 ] ) {
io . to ( userRows [ 0 ] . socket ) . emit ( "calib_error" , {
periphID : result . rows [ 0 ] . id ,
message : data . message || "Device error during calibration"
} ) ;
}
console . log ( ` Calibration cancelled for port ${ data . port } due to device error ` ) ;
} catch ( error ) {
console . error ( ` Error in device_calib_error: ` , error ) ;
}
} ) ;
2025-12-28 14:06:52 -06:00
// Device acknowledges ready for stage 1 (tilt up)
socket . on ( 'calib_stage1_ready' , async ( data ) => {
console . log ( ` Device ready for stage 1 (tilt up): ${ socket . id } ` ) ;
console . log ( data ) ;
try {
const { rows } = await pool . query ( "select device_id from device_tokens where socket=$1 and connected=TRUE" , [ socket . id ] ) ;
if ( rows . length != 1 ) throw new Error ( "No device with that ID connected to Socket." ) ;
const result = await pool . query (
"select id, user_id from peripherals where device_id=$1 and peripheral_number=$2" ,
[ rows [ 0 ] . device _id , data . port ]
) ;
if ( result . rowCount != 1 ) throw new Error ( "No such peripheral in database" ) ;
const { rows : userRows } = await pool . query ( "select socket from user_tokens where user_id=$1" , [ result . rows [ 0 ] . user _id ] ) ;
if ( userRows . length == 1 && userRows [ 0 ] ) {
const userSocket = userRows [ 0 ] . socket ;
io . to ( userSocket ) . emit ( "calib_stage1_ready" , { periphID : result . rows [ 0 ] . id } ) ;
}
} catch ( error ) {
console . error ( ` Error in calib_stage1_ready: ` , error ) ;
}
} ) ;
// Device acknowledges ready for stage 2 (tilt down)
socket . on ( 'calib_stage2_ready' , async ( data ) => {
console . log ( ` Device ready for stage 2 (tilt down): ${ socket . id } ` ) ;
console . log ( data ) ;
try {
const { rows } = await pool . query ( "select device_id from device_tokens where socket=$1 and connected=TRUE" , [ socket . id ] ) ;
if ( rows . length != 1 ) throw new Error ( "No device with that ID connected to Socket." ) ;
const result = await pool . query (
"select id, user_id from peripherals where device_id=$1 and peripheral_number=$2" ,
[ rows [ 0 ] . device _id , data . port ]
) ;
if ( result . rowCount != 1 ) throw new Error ( "No such peripheral in database" ) ;
const { rows : userRows } = await pool . query ( "select socket from user_tokens where user_id=$1" , [ result . rows [ 0 ] . user _id ] ) ;
if ( userRows . length == 1 && userRows [ 0 ] ) {
const userSocket = userRows [ 0 ] . socket ;
io . to ( userSocket ) . emit ( "calib_stage2_ready" , { periphID : result . rows [ 0 ] . id } ) ;
}
} catch ( error ) {
console . error ( ` Error in calib_stage2_ready: ` , error ) ;
}
} ) ;
// User confirms stage 1 complete (tilt up done)
socket . on ( 'user_stage1_complete' , async ( data ) => {
console . log ( ` User confirms stage 1 complete: ${ socket . id } ` ) ;
console . log ( data ) ;
try {
const { rows } = await pool . query ( "select socket from device_tokens where device_id=$1 and connected=TRUE" , [ data . deviceID ] ) ;
if ( rows . length != 1 ) {
// Device not connected - notify app
socket . emit ( "calib_error" , {
periphID : data . periphID ,
message : "Device disconnected during calibration"
} ) ;
// Reset peripheral state
await pool . query ( "update peripherals set await_calib=FALSE where id=$1" , [ data . periphID ] ) ;
return ;
}
const deviceSocket = rows [ 0 ] . socket ;
io . to ( deviceSocket ) . emit ( "user_stage1_complete" , { port : data . periphNum } ) ;
} catch ( error ) {
console . error ( ` Error in user_stage1_complete: ` , error ) ;
socket . emit ( "calib_error" , {
periphID : data . periphID ,
message : "Error during calibration"
} ) ;
}
} ) ;
// User confirms stage 2 complete (tilt down done)
socket . on ( 'user_stage2_complete' , async ( data ) => {
console . log ( ` User confirms stage 2 complete: ${ socket . id } ` ) ;
console . log ( data ) ;
try {
const { rows } = await pool . query ( "select socket from device_tokens where device_id=$1 and connected=TRUE" , [ data . deviceID ] ) ;
if ( rows . length != 1 ) {
// Device not connected - notify app
socket . emit ( "calib_error" , {
periphID : data . periphID ,
message : "Device disconnected during calibration"
} ) ;
// Reset peripheral state
await pool . query ( "update peripherals set await_calib=FALSE where id=$1" , [ data . periphID ] ) ;
return ;
}
const deviceSocket = rows [ 0 ] . socket ;
io . to ( deviceSocket ) . emit ( "user_stage2_complete" , { port : data . periphNum } ) ;
} catch ( error ) {
console . error ( ` Error in user_stage2_complete: ` , error ) ;
socket . emit ( "calib_error" , {
periphID : data . periphID ,
message : "Error during calibration"
} ) ;
}
} ) ;
// Device acknowledges calibration complete
2025-07-10 18:31:11 -05:00
socket . on ( 'calib_done' , async ( data ) => {
console . log ( ` Received 'calib_done' event from client ${ socket . id } : ` ) ;
console . log ( data ) ;
try {
const { rows } = await pool . query ( "select device_id from device_tokens where socket=$1 and connected=TRUE" , [ socket . id ] ) ;
if ( rows . length != 1 ) throw new Error ( "No device with that ID connected to Socket." ) ;
const result = await pool . query ( "update peripherals set await_calib=FALSE, calibrated=TRUE where device_id=$1 and peripheral_number=$2 returning id, user_id" ,
[ rows [ 0 ] . device _id , data . port ]
) ;
if ( result . rowCount != 1 ) throw new Error ( "No such peripheral in database" ) ;
const { rows : userRows } = await pool . query ( "select socket from user_tokens where user_id=$1" , [ result . rows [ 0 ] . user _id ] ) ;
if ( userRows . length == 1 ) {
if ( userRows [ 0 ] ) {
const userSocket = userRows [ 0 ] . socket ;
io . to ( userSocket ) . emit ( "calib_done" , { periphID : result . rows [ 0 ] . id } ) ;
}
2025-06-28 19:11:25 -05:00
}
2025-07-10 18:31:11 -05:00
else console . log ( "No App connected" ) ;
2025-06-28 19:11:25 -05:00
} catch ( error ) {
2025-07-10 18:31:11 -05:00
console . error ( ` Error processing job: ` , error ) ;
throw error ;
}
} ) ;
2025-06-28 19:11:25 -05:00
2025-07-10 18:31:11 -05:00
socket . on ( 'pos_hit' , async ( data ) => {
console . log ( ` Received 'pos_hit' event from client ${ socket . id } : ` ) ;
console . log ( data ) ;
const dateTime = new Date ( ) ;
try {
const { rows } = await pool . query ( "select device_id from device_tokens where socket=$1 and connected=TRUE" , [ socket . id ] ) ;
if ( rows . length != 1 ) throw new Error ( "No device with that ID connected to Socket." ) ;
const result = await pool . query ( "update peripherals set last_pos=$1, last_set=$2 where device_id=$3 and peripheral_number=$4 returning id, user_id" ,
[ data . pos , dateTime , rows [ 0 ] . device _id , data . port ]
) ;
if ( result . rowCount != 1 ) throw new Error ( "No such peripheral in database" ) ;
const { rows : userRows } = await pool . query ( "select socket from user_tokens where user_id=$1" , [ result . rows [ 0 ] . user _id ] ) ;
if ( userRows . length == 1 ) {
if ( userRows [ 0 ] ) {
const userSocket = userRows [ 0 ] . socket ;
io . to ( userSocket ) . emit ( "pos_hit" , { periphID : result . rows [ 0 ] . id } ) ;
}
}
else console . log ( "No App connected" ) ;
2025-06-28 19:11:25 -05:00
2025-07-10 18:31:11 -05:00
} catch ( error ) {
console . error ( ` Error processing job: ` , error ) ;
throw error ;
2025-06-28 19:11:25 -05:00
}
2025-07-10 18:31:11 -05:00
} ) ;
socket . on ( "disconnect" , async ( ) => {
if ( socket . user ) {
console . log ( "user disconnect" ) ;
await pool . query ( "update user_tokens set connected=FALSE where socket=$1" , [ socket . id ] ) ;
}
else {
console . log ( "device disconnect" ) ;
2025-12-28 14:06:52 -06:00
const { rows } = await pool . query (
"update device_tokens set connected=FALSE where socket=$1 returning device_id" ,
[ socket . id ]
) ;
// Notify user app that device disconnected
if ( rows . length === 1 ) {
const { rows : deviceRows } = await pool . query ( "select user_id from devices where id=$1" , [ rows [ 0 ] . device _id ] ) ;
if ( deviceRows . length === 1 ) {
const { rows : userRows } = await pool . query ( "select socket from user_tokens where user_id=$1 and connected=TRUE" , [ deviceRows [ 0 ] . user _id ] ) ;
if ( userRows . length === 1 && userRows [ 0 ] ) {
io . to ( userRows [ 0 ] . socket ) . emit ( "device_disconnected" , { deviceID : rows [ 0 ] . device _id } ) ;
}
}
}
2025-07-10 18:31:11 -05:00
}
} ) ;
} ) ;
2025-06-28 19:11:25 -05:00
async function createToken ( userId ) {
const token = jwt . sign ( { type : 'user' , userId } , JWT _SECRET , { expiresIn : TOKEN _EXPIRY } ) ;
await pool . query ( "delete from user_tokens where user_id=$1" , [ userId ] ) ;
await pool . query ( "insert into user_tokens (user_id, token) values ($1, $2)" , [ userId , token ] ) ;
return token ;
}
async function createPeripheralToken ( peripheralId ) {
const token = jwt . sign ( { type : 'peripheral' , peripheralId } , JWT _SECRET ) ;
await pool . query ( "insert into device_tokens (device_id, token) values ($1, $2)" , [ peripheralId , token ] ) ;
return token ;
}
async function createTempPeriphToken ( peripheralId ) {
const token = jwt . sign ( { type : 'peripheral' , peripheralId } , JWT _SECRET , { expiresIn : '2m' } ) ;
await pool . query ( "insert into device_tokens (device_id, token) values ($1, $2)" , [ peripheralId , token ] ) ;
return token ;
}
async function authenticateToken ( req , res , next ) {
const authHeader = req . headers [ 'authorization' ] ;
const token = authHeader ? . split ( ' ' ) [ 1 ] ;
if ( ! token ) return res . sendStatus ( 401 ) ;
try {
const payload = jwt . verify ( token , JWT _SECRET ) ;
if ( payload . type === 'user' ) {
const { rows } = await pool . query ( "select user_id from user_tokens where token=$1" , [ token ] ) ;
if ( rows . length != 1 ) throw new Error ( "Invalid/Expired Token" ) ;
req . user = payload . userId ; // make Id accessible in route handlers
}
else if ( payload . type === 'peripheral' ) {
const { rows } = await pool . query ( "select device_id from device_tokens where token=$1" , [ token ] ) ;
if ( rows . length != 1 ) throw new Error ( "Invalid/Expired Token" ) ;
req . peripheral = payload . peripheralId ;
2025-07-10 18:31:11 -05:00
}
else {
throw new Error ( "Invalid/Expired Token" ) ;
}
2025-06-28 19:11:25 -05:00
next ( ) ;
} catch {
return res . sendStatus ( 403 ) ; // invalid/expired token
}
}
app . get ( '/' , ( req , res ) => {
res . send ( 'Hello World!' ) ;
} ) ;
app . post ( '/login' , async ( req , res ) => {
const { email , password } = req . body ;
console . log ( 'login' ) ;
if ( ! email || ! password ) return res . status ( 400 ) . json ( { error : 'email and password required' } ) ;
try {
const { rows } = await pool . query ( 'select id, password_hash_string from users where email = $1' , [ email ] ) ;
if ( rows . length === 0 ) return res . status ( 401 ) . json ( { error : 'Invalid Credentials' } ) ;
const user = rows [ 0 ]
console . log ( 'user found' ) ;
const verified = await verify ( user . password _hash _string , password ) ;
if ( ! verified ) return res . status ( 401 ) . json ( { error : 'Invalid credentials' } ) ;
console . log ( "password correct" ) ;
const token = await createToken ( user . id ) ; // token is now tied to ID
res . status ( 200 ) . json ( { token } ) ;
} catch ( err ) {
console . error ( err ) ;
res . status ( 500 ) . json ( { error : 'Internal server error' } ) ;
}
} ) ;
app . post ( '/create_user' , async ( req , res ) => {
console . log ( "got post req" ) ;
const { name , email , password } = req . body
try {
const hashedPass = await hash ( password ) ;
await pool . query ( "insert into users (name, email, password_hash_string) values (nullif($1, ''), $2, $3)" ,
[ name , email , hashedPass ]
) ;
return res . sendStatus ( 201 ) ;
} catch ( err ) {
console . error ( err ) ;
if ( err . code === '23505' ) {
return res . status ( 409 ) . json ( { error : 'Email already in use' } ) ;
}
return res . sendStatus ( 500 ) ;
}
} ) ;
app . get ( '/verify' , authenticateToken , async ( req , res ) => {
try {
// Issue a new token to extend session
const newToken = await createToken ( req . user ) ;
res . status ( 200 ) . json ( { token : newToken } ) ;
} catch {
res . status ( 500 ) . json ( { error : 'server error' } ) ;
}
} ) ;
app . get ( '/device_list' , authenticateToken , async ( req , res ) => {
try {
console . log ( "device List request" ) ;
console . log ( req . user ) ;
2025-12-24 18:39:48 -06:00
const { rows } = await pool . query ( 'select id, device_name, max_ports from devices where user_id = $1' , [ req . user ] ) ;
2025-06-28 19:11:25 -05:00
const deviceNames = rows . map ( row => row . device _name ) ;
const deviceIds = rows . map ( row => row . id ) ;
2025-12-24 18:39:48 -06:00
const maxPorts = rows . map ( row => row . max _ports ) ;
res . status ( 200 ) . json ( { device _ids : deviceIds , devices : deviceNames , max _ports : maxPorts } ) ;
2025-06-28 19:11:25 -05:00
} catch {
res . status ( 500 ) . json ( { error : 'Internal Server Error' } ) ;
}
} ) ;
app . get ( '/device_name' , authenticateToken , async ( req , res ) => {
console . log ( "deviceName" ) ;
try {
const { deviceId } = req . query ;
2025-12-24 18:39:48 -06:00
const { rows } = await pool . query ( 'select device_name, max_ports from devices where id=$1 and user_id=$2' ,
2025-06-28 19:11:25 -05:00
[ deviceId , req . user ] ) ;
if ( rows . length != 1 ) return res . sendStatus ( 404 ) ;
const deviceName = rows [ 0 ] . device _name ;
2025-12-24 18:39:48 -06:00
const maxPorts = rows [ 0 ] . max _ports ;
res . status ( 200 ) . json ( { device _name : deviceName , max _ports : maxPorts } ) ;
2025-06-28 19:11:25 -05:00
} catch {
res . sendStatus ( 500 ) ;
}
} ) ;
app . get ( '/peripheral_list' , authenticateToken , async ( req , res ) => {
console . log ( "periph list" )
try {
const { deviceId } = req . query ;
const { rows } = await pool . query ( 'select id, peripheral_number, peripheral_name from peripherals where device_id=$1 and user_id=$2' ,
[ deviceId , req . user ] ) ;
const peripheralIds = rows . map ( row => row . id ) ;
const portNums = rows . map ( row => row . peripheral _number ) ;
const peripheralNames = rows . map ( row => row . peripheral _name ) ;
res . status ( 200 ) . json ( { peripheral _ids : peripheralIds , port _nums : portNums , peripheral _names : peripheralNames } ) ;
} catch {
res . sendStatus ( 500 ) ;
}
} )
app . post ( '/add_device' , authenticateToken , async ( req , res ) => {
try {
console . log ( "add device request" ) ;
console . log ( req . user ) ;
console . log ( req . peripheral ) ;
2025-12-24 18:39:48 -06:00
const { deviceName , maxPorts } = req . body ;
2025-06-28 19:11:25 -05:00
console . log ( deviceName ) ;
2025-12-24 18:39:48 -06:00
const ports = maxPorts || 4 ; // Default to 4 for multi-port devices
const { rows } = await pool . query ( "insert into devices (user_id, device_name, max_ports) values ($1, $2, $3) returning id" ,
[ req . user , deviceName , ports ]
2025-06-28 19:11:25 -05:00
) ; // finish token return based on device ID.
const deviceInitToken = await createTempPeriphToken ( rows [ 0 ] . id ) ;
res . status ( 201 ) . json ( { token : deviceInitToken } ) ;
} catch ( err ) {
console . log ( err ) ;
if ( err . code == '23505' ) {
return res . status ( 409 ) . json ( { error : 'Device Name in use' } ) ;
}
res . status ( 500 ) . json ( { error : 'Internal Server Error' } ) ;
}
} ) ;
app . post ( '/add_peripheral' , authenticateToken , async ( req , res ) => {
try {
const { device _id , port _num , peripheral _name } = req . body ;
await pool . query ( "insert into peripherals (device_id, peripheral_number, peripheral_name, user_id) values ($1, $2, $3, $4)" ,
[ device _id , port _num , peripheral _name , req . user ]
) ;
res . sendStatus ( 201 ) ;
} catch ( err ) {
if ( err . code == '23505' ) return res . sendStatus ( 409 ) ;
res . sendStatus ( 500 ) ;
}
} ) ;
app . get ( '/verify_device' , authenticateToken , async ( req , res ) => {
console . log ( "device verify" ) ;
try {
console . log ( req . peripheral ) ;
2025-12-24 18:39:48 -06:00
// Check if this is a single-port device
const { rows : deviceRows } = await pool . query ( "select max_ports, user_id from devices where id=$1" , [ req . peripheral ] ) ;
if ( deviceRows . length === 0 ) {
return res . status ( 404 ) . json ( { error : "Device not found" } ) ;
}
const maxPorts = deviceRows [ 0 ] . max _ports ;
const userId = deviceRows [ 0 ] . user _id ;
// For single-port devices, automatically create the peripheral if it doesn't exist
if ( maxPorts === 1 ) {
const { rows : periphRows } = await pool . query (
"select id from peripherals where device_id=$1" ,
[ req . peripheral ]
) ;
if ( periphRows . length === 0 ) {
// Create default peripheral for C6 device
await pool . query (
"insert into peripherals (device_id, peripheral_number, peripheral_name, user_id) values ($1, 1, $2, $3)" ,
[ req . peripheral , "Main Blind" , userId ]
) ;
console . log ( "Created default peripheral for single-port device" ) ;
}
}
2025-06-28 19:11:25 -05:00
await pool . query ( "delete from device_tokens where device_id=$1" , [ req . peripheral ] ) ;
const newToken = await createPeripheralToken ( req . peripheral ) ;
console . log ( "New Token" , newToken ) ;
res . json ( { token : newToken , id : req . peripheral } ) ;
2025-12-24 18:39:48 -06:00
} catch ( error ) {
console . error ( "Error in verify_device:" , error ) ;
2025-06-28 19:11:25 -05:00
res . status ( 500 ) . json ( { error : "server error" } ) ;
}
} ) ;
app . get ( '/position' , authenticateToken , async ( req , res ) => {
console . log ( "devicepos" ) ;
try {
2025-07-10 18:31:11 -05:00
const { rows } = await pool . query ( "select last_pos, peripheral_number, await_calib from peripherals where device_id=$1" ,
[ req . peripheral ] ) ;
2025-06-28 19:11:25 -05:00
if ( rows . length == 0 ) {
return res . sendStatus ( 404 ) ;
}
2025-07-10 18:31:11 -05:00
res . status ( 200 ) . json ( rows ) ;
2025-06-28 19:11:25 -05:00
} catch {
res . status ( 500 ) . json ( { error : "server error" } ) ;
}
} ) ;
app . post ( '/manual_position_update' , authenticateToken , async ( req , res ) => {
console . log ( "setpos" ) ;
try {
2025-07-10 18:31:11 -05:00
const { periphId , periphNum , deviceId , newPos } = req . body ;
const changedPosList = [ { periphNum : periphNum , periphID : periphId , pos : newPos } ] ;
// Schedule the job to run immediately
const job = await agenda . now ( 'posChange' , { deviceID : deviceId , changedPosList : changedPosList , userID : req . user } ) ;
res . status ( 202 ) . json ( { // 202 Accepted, as processing happens in background
success : true ,
message : 'Request accepted for immediate processing.' ,
jobId : job . attrs . _id
} ) ;
} catch ( error ) {
console . error ( 'Error triggering immediate action:' , error ) ;
res . status ( 500 ) . json ( { success : false , message : 'Failed to trigger immediate action' , error : error . message } ) ;
2025-06-28 19:11:25 -05:00
}
} ) ;
app . post ( '/calib' , authenticateToken , async ( req , res ) => {
console . log ( "calibrate" ) ;
try {
const { periphId } = req . body ;
2025-07-10 18:31:11 -05:00
// Schedule the job to run immediately
const job = await agenda . now ( 'calib' , { periphID : periphId , userID : req . user } ) ;
res . status ( 202 ) . json ( { // 202 Accepted, as processing happens in background
success : true ,
message : 'Request accepted for immediate processing.' ,
jobId : job . attrs . _id
} ) ;
} catch ( err ) {
console . error ( 'Error triggering immediate action:' , err ) ;
2025-06-28 19:11:25 -05:00
res . sendStatus ( 500 ) ;
}
} )
app . post ( '/cancel_calib' , authenticateToken , async ( req , res ) => {
console . log ( "cancelCalib" ) ;
try {
const { periphId } = req . body ;
2025-07-10 18:31:11 -05:00
const job = await agenda . now ( 'cancel_calib' , { periphID : periphId , userID : req . user } ) ;
res . status ( 202 ) . json ( { // 202 Accepted, as processing happens in background
success : true ,
message : 'Request accepted for immediate processing.' ,
jobId : job . attrs . _id
} ) ;
2025-06-28 19:11:25 -05:00
} catch {
res . sendStatus ( 500 ) ;
}
} ) ;
app . get ( '/peripheral_status' , authenticateToken , async ( req , res ) => {
console . log ( "status" ) ;
try {
const { periphId } = req . query ;
const { rows } = await pool . query ( "select last_pos, last_set, calibrated, await_calib from peripherals where id=$1 and user_id=$2" ,
[ periphId , req . user ]
) ;
if ( rows . length != 1 ) return res . sendStatus ( 404 ) ;
res . status ( 200 ) . json ( { last _pos : rows [ 0 ] . last _pos , last _set : rows [ 0 ] . last _set ,
calibrated : rows [ 0 ] . calibrated , await _calib : rows [ 0 ] . await _calib } ) ;
} catch {
res . sendStatus ( 500 ) ;
}
} ) ;
app . get ( '/peripheral_name' , authenticateToken , async ( req , res ) => {
console . log ( "urmom" ) ;
try {
const { periphId } = req . query ;
const { rows } = await pool . query ( "select peripheral_name from peripherals where id=$1 and user_id=$2" ,
[ periphId , req . user ]
) ;
if ( rows . length != 1 ) return res . sendStatus ( 404 ) ;
res . status ( 200 ) . json ( { name : rows [ 0 ] . peripheral _name } ) ;
} catch {
res . sendStatus ( 500 ) ;
}
} )
2025-12-28 14:06:52 -06:00
app . get ( '/device_connection_status' , authenticateToken , async ( req , res ) => {
console . log ( "device connection status" ) ;
try {
const { deviceId } = req . query ;
// Verify device belongs to user
const { rows : deviceRows } = await pool . query ( "select id from devices where id=$1 and user_id=$2" ,
[ deviceId , req . user ]
) ;
if ( deviceRows . length != 1 ) return res . sendStatus ( 404 ) ;
// Check if device has an active socket connection
const { rows } = await pool . query ( "select connected from device_tokens where device_id=$1 and connected=TRUE" ,
[ deviceId ]
) ;
res . status ( 200 ) . json ( { connected : rows . length > 0 } ) ;
} catch {
res . sendStatus ( 500 ) ;
}
} )
2025-06-28 19:11:25 -05:00
app . post ( '/completed_calib' , authenticateToken , async ( req , res ) => {
console . log ( "calibration complete" ) ;
try {
const { portNum } = req . body ;
const result = await pool . query ( "update peripherals set calibrated=true, await_calib=false where device_id=$1 and peripheral_number=$2" ,
[ req . peripheral , portNum ]
) ;
if ( result . rowCount === 0 ) return res . sendStatus ( 404 ) ;
res . sendStatus ( 204 ) ;
} catch ( error ) {
console . error ( error ) ;
res . sendStatus ( 500 ) ;
}
} ) ;
app . post ( '/rename_device' , authenticateToken , async ( req , res ) => {
console . log ( "Hub rename" ) ;
try {
const { deviceId , newName } = req . body ;
const result = await pool . query ( "update devices set device_name=$1 where id=$2 and user_id=$3" , [ newName , deviceId , req . user ] ) ;
if ( result . rowCount === 0 ) return res . sendStatus ( 404 ) ;
res . sendStatus ( 204 ) ;
} catch ( err ) {
if ( err . code == '23505' ) return res . sendStatus ( 409 ) ;
console . error ( err ) ;
res . sendStatus ( 500 ) ;
}
} ) ;
app . post ( '/rename_peripheral' , authenticateToken , async ( req , res ) => {
console . log ( "Hub rename" ) ;
try {
const { periphId , newName } = req . body ;
const result = await pool . query ( "update peripherals set peripheral_name=$1 where id=$2 and user_id=$3" , [ newName , periphId , req . user ] ) ;
if ( result . rowCount === 0 ) return res . sendStatus ( 404 ) ;
res . sendStatus ( 204 ) ;
} catch ( err ) {
if ( err . code == '23505' ) return res . sendStatus ( 409 ) ;
console . error ( err ) ;
res . sendStatus ( 500 ) ;
}
} ) ;
app . post ( '/delete_device' , authenticateToken , async ( req , res ) => {
console . log ( "delete device" ) ;
try {
const { deviceId } = req . body ;
const { rows } = await pool . query ( "delete from devices where user_id=$1 and id=$2 returning id" ,
[ req . user , deviceId ]
) ;
if ( rows . length != 1 ) {
return res . status ( 404 ) . json ( { error : 'Device not found' } ) ;
}
2025-12-24 18:39:48 -06:00
// Get socket ID before deleting tokens
const { rows : tokenRows } = await pool . query (
"select socket from device_tokens where device_id=$1 and connected=TRUE" ,
[ rows [ 0 ] . id ]
) ;
// Delete device tokens
2025-06-28 19:11:25 -05:00
await pool . query ( "delete from device_tokens where device_id=$1" , [ rows [ 0 ] . id ] ) ;
2025-12-24 18:39:48 -06:00
// Forcefully disconnect the Socket.IO connection if device is connected
if ( tokenRows . length > 0 ) {
tokenRows . forEach ( row => {
if ( row . socket ) {
const socket = io . sockets . sockets . get ( row . socket ) ;
if ( socket ) {
console . log ( ` Forcefully disconnecting device socket ${ row . socket } ` ) ;
socket . emit ( 'device_deleted' , { message : 'Device has been deleted from the account' } ) ;
socket . disconnect ( true ) ;
}
}
} ) ;
}
2025-06-28 19:11:25 -05:00
res . sendStatus ( 204 ) ;
2025-12-24 18:39:48 -06:00
} catch ( error ) {
console . error ( "Error deleting device:" , error ) ;
2025-06-28 19:11:25 -05:00
res . status ( 500 ) . json ( { error : "server error" } ) ;
}
} ) ;
app . post ( '/delete_peripheral' , authenticateToken , async ( req , res ) => {
console . log ( "delete peripheral" ) ;
try {
const { periphId } = req . body ;
const { rows } = await pool . query ( "delete from peripherals where user_id = $1 and id=$2 returning id" ,
[ req . user , periphId ]
) ;
if ( rows . length != 1 ) {
return res . status ( 404 ) . json ( { error : 'Device not found' } ) ;
}
res . sendStatus ( 204 ) ;
} catch {
res . sendStatus ( 500 ) ;
}
} )
2026-01-01 12:55:34 -06:00
// Helper function to create cron expression from time and days
function createCronExpression ( time , daysOfWeek ) {
const cronDays = daysOfWeek . join ( ',' ) ;
return ` ${ time . minute } ${ time . hour } * * ${ cronDays } ` ;
}
// Helper function to find and verify a schedule job belongs to the user
async function findUserScheduleJob ( jobId , userId ) {
const jobs = await agenda . jobs ( {
_id : new ObjectId ( jobId ) ,
'data.userID' : userId
} ) ;
return jobs . length === 1 ? jobs [ 0 ] : null ;
}
app . post ( '/add_schedule' , authenticateToken , async ( req , res ) => {
console . log ( "add schedule" ) ;
try {
const { periphId , periphNum , deviceId , newPos , time , daysOfWeek } = req . body ;
// Validate required fields
if ( ! periphId || periphNum === undefined || ! deviceId || newPos === undefined || ! time || ! daysOfWeek || daysOfWeek . length === 0 ) {
return res . status ( 400 ) . json ( { error : 'Missing required fields' } ) ;
}
// Create cron expression
const cronExpression = createCronExpression ( time , daysOfWeek ) ;
// Check for duplicate schedule
const existingJobs = await agenda . jobs ( {
'name' : 'posChangeScheduled' ,
'data.changedPosList.0.periphID' : periphId ,
'data.userID' : req . user
} ) ;
// Check if any existing job has the same cron expression (time + days)
const duplicate = existingJobs . find ( job => {
const jobCron = job . attrs . repeatInterval ;
return jobCron === cronExpression ;
} ) ;
if ( duplicate ) {
console . log ( "Duplicate schedule detected" ) ;
return res . status ( 409 ) . json ( {
success : false ,
message : 'A schedule with the same time and days already exists for this blind' ,
duplicate : true
} ) ;
}
const changedPosList = [ { periphNum : periphNum , periphID : periphId , pos : newPos } ] ;
// Schedule the recurring job
const job = await agenda . create ( 'posChangeScheduled' , {
deviceID : deviceId ,
changedPosList : changedPosList ,
userID : req . user
} ) ;
job . repeatEvery ( cronExpression , {
skipImmediate : true
} ) ;
await job . save ( ) ;
res . status ( 201 ) . json ( {
success : true ,
message : 'Schedule created successfully' ,
jobId : job . attrs . _id
} ) ;
} catch ( error ) {
console . error ( 'Error creating schedule:' , error ) ;
res . status ( 500 ) . json ( { success : false , message : 'Failed to create schedule' , error : error . message } ) ;
}
} ) ;
app . post ( '/delete_schedule' , authenticateToken , async ( req , res ) => {
console . log ( "delete schedule" ) ;
try {
const { jobId } = req . body ;
if ( ! jobId ) {
return res . status ( 400 ) . json ( { error : 'Missing jobId' } ) ;
}
// Find and verify the existing job belongs to the user
const existingJob = await findUserScheduleJob ( jobId , req . user ) ;
if ( ! existingJob ) {
return res . status ( 404 ) . json ( { error : 'Schedule not found' } ) ;
}
// Remove the job
await existingJob . remove ( ) ;
console . log ( "Schedule deleted successfully:" , jobId ) ;
res . status ( 200 ) . json ( {
success : true ,
message : 'Schedule deleted successfully'
} ) ;
} catch ( error ) {
console . error ( 'Error deleting schedule:' , error ) ;
res . status ( 500 ) . json ( { success : false , message : 'Failed to delete schedule' , error : error . message } ) ;
}
} ) ;
app . post ( '/update_schedule' , authenticateToken , async ( req , res ) => {
console . log ( "update schedule" ) ;
console . log ( "Request body:" , req . body ) ;
try {
const { jobId , periphId , periphNum , deviceId , newPos , time , daysOfWeek } = req . body ;
console . log ( "jobId type:" , typeof jobId , "value:" , jobId ) ;
// Validate required fields
if ( ! jobId || ! periphId || periphNum === undefined || ! deviceId || newPos === undefined || ! time || ! daysOfWeek || daysOfWeek . length === 0 ) {
console . log ( "Missing required fields" ) ;
return res . status ( 400 ) . json ( { error : 'Missing required fields' } ) ;
}
// Find and verify the existing job belongs to the user
console . log ( "Searching for job with _id:" , jobId , "and userID:" , req . user ) ;
const existingJob = await findUserScheduleJob ( jobId , req . user ) ;
console . log ( "Found job:" , existingJob ? 'yes' : 'no' ) ;
if ( ! existingJob ) {
console . log ( "Schedule not found" ) ;
return res . status ( 404 ) . json ( { error : 'Schedule not found' } ) ;
}
console . log ( "Removing old job..." ) ;
// Cancel the old job
await existingJob . remove ( ) ;
// Create cron expression
const cronExpression = createCronExpression ( time , daysOfWeek ) ;
const changedPosList = [ { periphNum : periphNum , periphID : periphId , pos : newPos } ] ;
console . log ( "Creating new job with cron:" , cronExpression ) ;
// Create new job with updated schedule
const job = await agenda . create ( 'posChangeScheduled' , {
deviceID : deviceId ,
changedPosList : changedPosList ,
userID : req . user
} ) ;
job . repeatEvery ( cronExpression , {
skipImmediate : true
} ) ;
await job . save ( ) ;
console . log ( "Job saved successfully with ID:" , job . attrs . _id ) ;
res . status ( 200 ) . json ( {
success : true ,
message : 'Schedule updated successfully' ,
jobId : job . attrs . _id
} ) ;
} catch ( error ) {
console . error ( 'Error updating schedule:' , error ) ;
res . status ( 500 ) . json ( { success : false , message : 'Failed to update schedule' , error : error . message } ) ;
}
} ) ;
2025-06-28 19:11:25 -05:00
server . listen ( port , ( ) => {
console . log ( ` Example app listening at http://localhost: ${ port } ` ) ;
2025-12-23 17:22:33 -06:00
} ) ;
app . post ( '/periph_schedule_list' , authenticateToken , async ( req , res ) => {
try {
console . log ( "Schedule List request for user:" , req . user ) ;
const { periphId } = req . body ;
if ( ! periphId ) {
return res . status ( 400 ) . json ( { error : 'periphId is required in the request body.' } ) ;
}
// FIX 1: Assign the returned array directly to a variable (e.g., 'jobs')
const jobs = await agenda . jobs ( {
'name' : 'posChangeScheduled' ,
'data.changedPosList' : { $size : 1 } ,
'data.changedPosList.0.periphID' : periphId ,
'data.userID' : req . user
} ) ;
// FIX 2: Use .filter() and .map() to handle all cases cleanly.
// This creates a cleaner, more predictable transformation pipeline.
const details = jobs
// Step 1: Filter out any jobs that are not recurring.
. filter ( job => job . attrs . repeatInterval )
// Step 2: Map the remaining recurring jobs to our desired format.
. map ( job => {
const { repeatInterval , data , repeatTimezone } = job . attrs ;
try {
const interval = cronParser . parseExpression ( repeatInterval , {
tz : repeatTimezone || undefined
} ) ;
const fields = interval . fields ;
// Make sure to declare the variable
const parsedSchedule = {
minutes : fields . minute ,
hours : fields . hour ,
daysOfMonth : fields . dayOfMonth ,
months : fields . month ,
daysOfWeek : fields . dayOfWeek ,
} ;
return {
id : job . attrs . _id , // It's good practice to return the job ID
schedule : parsedSchedule ,
pos : data . changedPosList [ 0 ] . pos
} ;
} catch ( err ) {
// If parsing fails for a specific job, log it and filter it out.
console . error ( ` Could not parse " ${ repeatInterval } " for job ${ job . attrs . _id } . Skipping. ` ) ;
return null ; // Return null for now
}
} )
// Step 3: Filter out any nulls that resulted from a parsing error.
. filter ( detail => detail !== null ) ;
res . status ( 200 ) . json ( { scheduledUpdates : details } ) ;
} catch ( error ) { // FIX 3: Capture and log the actual error
console . error ( "Error in /periph_schedule_list:" , error ) ;
res . status ( 500 ) . json ( { error : 'Internal Server Error' } ) ;
}
} ) ;