From 2549bc3b20392de582297b0d884e6825651ce756 Mon Sep 17 00:00:00 2001 From: Aditya Pulipaka Date: Thu, 10 Jul 2025 18:31:11 -0500 Subject: [PATCH] Direct Control --- agenda.js | 155 +++++++++++++++++++++++++++-- db.js | 8 +- index.js | 245 +++++++++++++++++++++++++++++++++------------- package-lock.json | 10 ++ package.json | 1 + 5 files changed, 336 insertions(+), 83 deletions(-) diff --git a/agenda.js b/agenda.js index 9c3e993..7fe5b87 100644 --- a/agenda.js +++ b/agenda.js @@ -1,13 +1,12 @@ // agenda.js (modified to receive the already created pool) const Agenda = require('agenda'); -const socketIo = require('socket.io'); let agenda; -let wssInstance; +let socketIoInstance; let sharedPgPool; // This will hold the pool instance passed from server.js -const initializeAgenda = async (mongoUri, pool, wss) => { // Now accepts pgPool - wssInstance = wss; +const initializeAgenda = async (mongoUri, pool, io) => { // Now accepts pgPool + socketIoInstance = io; sharedPgPool = pool; // Store the passed pool agenda = new Agenda({ @@ -17,17 +16,152 @@ const initializeAgenda = async (mongoUri, pool, wss) => { // Now accepts pgPool } }); - agenda.define('manual update position', async (job) => { - const { recordId, dataToUpdate, conditionCheck } = job.attrs.data; - console.log(`Processing job for recordId: ${recordId} at ${new Date()}`); + agenda.define('calib', async (job) => { + const {periphID, userID } = job.attrs.data; try { + const result = await sharedPgPool.query("update peripherals set await_calib=TRUE, calibrated=FALSE where id=$1 and user_id=$2 returning device_id, peripheral_number, id", + [periphID, userID] + ); + if (result.rowCount != 1) throw new Error("No such peripheral in database"); + const {rows} = await sharedPgPool.query("select socket from device_tokens where device_id=$1 and connected=TRUE", [result.rows[0].device_id]); + if (rows.length != 1) console.log("No device with that ID connected to Socket."); + else { + const socket = rows[0].socket; + if (socket) { + socketIoInstance.to(socket).emit("calib", {periphNum: result.rows[0].peripheral_number}); + } + } - const updatedRecord = result.rows[0]; - + const {rows: userRows} = await sharedPgPool.query("select socket from user_tokens where user_id=$1", [userID]); + if (userRows.length == 1) { + if (userRows[0]){ + const userSocket = userRows[0].socket; + socketIoInstance.to(userSocket).emit("calib", {periphID: result.rows[0].id}); + } + } + else console.log("No App connected"); } catch (error) { - console.error(`Error processing job for recordId ${recordId}:`, error); + console.error(`Error processing job:`, error); + throw error; + } + }); + + agenda.define('cancel_calib', async (job) => { + const {periphID, userID } = job.attrs.data; + try { + const result = await sharedPgPool.query("update peripherals set await_calib=FALSE where id=$1 and user_id=$2 returning device_id, peripheral_number", + [periphID, userID] + ); + if (result.rowCount != 1) throw new Error("No such peripheral in database"); + + const {rows} = await sharedPgPool.query("select socket from device_tokens where device_id=$1 and connected=TRUE", [result.rows[0].device_id]); + if (rows.length != 1) console.log("No device with that ID connected to Socket."); + else { + const socket = rows[0].socket; + if (socket) { + socketIoInstance.to(socket).emit("cancel_calib", {periphNum: result.rows[0].peripheral_number}); + } + } + + const {rows: userRows} = await sharedPgPool.query("select socket from user_tokens where user_id=$1", [userID]); + + if (userRows.length == 1) { + if (userRows[0]){ + const userSocket = userRows[0].socket; + socketIoInstance.to(userSocket).emit("cancel_calib", {periphID: result.rows[0].id}); + } + } + else console.log("No App connected"); + } catch (error) { + console.error(`Error processing job:`, error); + throw error; + } + }) + + agenda.define('posChange', async (job) => { + const { deviceID, changedPosList, userID } = job.attrs.data; + // changedPosList is of the structure [{periphNum, periphID, pos}] + const dateTime = new Date(); + if (!changedPosList) console.log("undefined list"); + try { + const posWithoutID = changedPosList.map(pos => { + const { periphID, ...rest } = pos; + return rest; + }); + + const posWithoutNumber = changedPosList.map(pos => { + const { periphNum, ...rest } = pos; + return rest; + }); + + for (const pos of changedPosList) { + const result = await sharedPgPool.query("update peripherals set last_pos=$1, last_set=$2 where id=$3 and user_id=$4", + [pos.pos, dateTime, pos.periphID, userID] + ); + if (result.rowCount != 1) throw new Error("No such peripheral in database"); + } + + const {rows} = await sharedPgPool.query("select socket from device_tokens where device_id=$1 and connected=TRUE", [deviceID]); + if (rows.length != 1) console.log("No device with that ID connected to Socket."); + else { + const socket = rows[0].socket; + if (socket) { + socketIoInstance.to(socket).emit("posUpdates", posWithoutID); + } + } + + } catch (error) { + console.error(`Error processing job:`, error); + throw error; + } + }); + + agenda.define('posChangeScheduled', async (job) => { + const { deviceID, changedPosList, userID } = job.attrs.data; + // changedPosList is of the structure [{periphNum, periphID, pos}] + const dateTime = new Date(); + if (!changedPosList) console.log("undefined list"); + try { + const posWithoutID = changedPosList.map(pos => { + const { periphID, ...rest } = pos; + return rest; + }); + + const posWithoutNumber = changedPosList.map(pos => { + const { periphNum, ...rest } = pos; + return rest; + }); + + for (const pos of changedPosList) { + const result = await sharedPgPool.query("update peripherals set last_pos=$1, last_set=$2 where id=$3 and user_id=$4", + [pos.pos, dateTime, pos.periphID, userID] + ); + if (result.rowCount != 1) throw new Error("No such peripheral in database"); + } + + const {rows} = await sharedPgPool.query("select socket from device_tokens where device_id=$1 and connected=TRUE", [deviceID]); + if (rows.length != 1) console.log("No device with that ID connected to Socket."); + else { + const socket = rows[0].socket; + if (socket) { + socketIoInstance.to(socket).emit("posUpdates", posWithoutID); + } + } + + const {rows: userRows} = await sharedPgPool.query("select socket from user_tokens where user_id=$1", [userID]); + + if (userRows.length == 1) { + if (userRows[0]){ + const userSocket = userRows[0].socket; + socketIoInstance.to(userSocket).emit("posUpdates", posWithoutNumber); + } + } + else console.log("No App connected"); + + } catch (error) { + console.error(`Error processing job:`, error); throw error; } }); @@ -40,6 +174,7 @@ const initializeAgenda = async (mongoUri, pool, wss) => { // Now accepts pgPool await agenda.start(); console.log('Agenda job processing started.'); + return agenda; }; module.exports = { diff --git a/db.js b/db.js index 246547b..8532382 100644 --- a/db.js +++ b/db.js @@ -3,12 +3,8 @@ const mongoose = require('mongoose'); const connectDB = async () => { try { - const mongoUri = process.env.MONGO_URI || 'mongodb://localhost:27017/myScheduledApp'; - await mongoose.connect(mongoUri, { - // These options are often recommended for newer Mongoose versions, though some might be deprecated in future - // useNewUrlParser: true, - // useUnifiedTopology: true, - }); + const mongoUri = 'mongodb://localhost:27017/myScheduledApp'; + await mongoose.connect(mongoUri); console.log('MongoDB connected successfully for Mongoose!'); } catch (err) { console.error('MongoDB connection error (Mongoose):', err); diff --git a/index.js b/index.js index c48d8ba..83d1aab 100644 --- a/index.js +++ b/index.js @@ -6,62 +6,162 @@ require('dotenv').config(); const jwt = require('jsonwebtoken'); const http = require('http'); const socketIo = require('socket.io'); +const connectDB = require('./db'); // Your Mongoose DB connection +const { initializeAgenda } = require('./agenda'); // Agenda setup +const format = require('pg-format'); const app = express(); const port = 3000; app.use(json()); const pool = new Pool(); const server = http.createServer(app); -const io = socketIo(server) +const io = socketIo(server, { + pingInterval: 15000, + pingTimeout: 10000, +}); +let agenda; + +(async () => { + // 1. Connect to MongoDB (for your application data) + await connectDB(); + + agenda = await initializeAgenda('mongodb://localhost:27017/myScheduledApp', pool, io); +})(); + +(async () => { + await pool.query("update user_tokens set connected=FALSE where connected=TRUE"); + await pool.query("update device_tokens set connected=FALSE where connected=TRUE"); +})(); const JWT_SECRET = process.env.JWT_SECRET; const TOKEN_EXPIRY = '5d'; -io.on('connection', (socket) => { +io.on('connection', async (socket) => { console.log("periph connected"); - socket.on("authenticate", async (data) => { - try { - const authData = JSON.parse(data); - const token = authData.token; - const periphId = authData.id; + const token = socket.handshake.auth.token ?? socket.handshake.headers['authorization']?.split(' ')[1]; + try { + if (!token) throw new Error("no token!"); + const payload = jwt.verify(token, JWT_SECRET); + let table; + let idCol; + let id; + + if (payload.type === "user") { + table = "user_tokens"; + idCol = "user_id"; + socket.user = true; + id = payload.userId; + } + else if (payload.type === "peripheral") { + table = "device_tokens"; + idCol = "device_id"; + socket.user = false; + id = payload.peripheralId; + } + else { + throw new Error("Unauthorized"); + } - const {rows} = await pool.query("select * from device_tokens where device_id=$1 and token=$2 and connected=FALSE", - [periphId, token] - ); + const query = format(`update %I set connected=TRUE, socket=$1 where %I=$2 and token=$3 and connected=FALSE`, + table, idCol + ); - if (rows.length != 1) { - const errorResponse = { - type: 'error', - code: 404, - message: 'Device not found' - }; - socket.emit("error", errorResponse); - socket.disconnect(true); - } - - else { - await pool.query("update device_tokens set connected=TRUE where device_id=$1 and token=$2 and connected=FALSE", - [periphId, token] - ); - const successResponse = { - type: 'success', - code: 200, - message: 'Device found' - }; - socket.emit("success", successResponse); - } + const result = await pool.query(query, [socket.id, id, token]); - } catch (error) { - console.error('Error during periph authentication:', error); - - // Send an error message to the client - socket.emit('error', { type: 'error', code: 500 }); - - // Disconnect the client + if (result.rowCount != 1) { + const errorResponse = { + type: 'error', + code: 404, + message: 'Device not found or already connected' + }; + socket.emit("error", errorResponse); socket.disconnect(true); } - }) -}) + + else { + const successResponse = { + type: 'success', + code: 200, + message: 'Device found' + }; + console.log("success"); + socket.emit("success", successResponse); + } + + } catch (error) { + console.error('Error during periph authentication:', error); + + // Send an error message to the client + socket.emit('error', { type: 'error', code: 500 }); + + // Disconnect the client + socket.disconnect(true); + } + + socket.on('calib_done', async (data) => { + console.log(`Received 'calib_done' event from client ${socket.id}:`); + console.log(data); + try { + const {rows} = await pool.query("select device_id from device_tokens where socket=$1 and connected=TRUE", [socket.id]); + if (rows.length != 1) throw new Error("No device with that ID connected to Socket."); + const result = await pool.query("update peripherals set await_calib=FALSE, calibrated=TRUE where device_id=$1 and peripheral_number=$2 returning id, user_id", + [rows[0].device_id, data.port] + ); + if (result.rowCount != 1) throw new Error("No such peripheral in database"); + + const {rows: userRows} = await pool.query("select socket from user_tokens where user_id=$1", [result.rows[0].user_id]); + if (userRows.length == 1) { + if (userRows[0]){ + const userSocket = userRows[0].socket; + io.to(userSocket).emit("calib_done", {periphID: result.rows[0].id}); + } + } + else console.log("No App connected"); + + } catch (error) { + console.error(`Error processing job:`, error); + throw error; + } + }); + + socket.on('pos_hit', async (data) => { + console.log(`Received 'pos_hit' event from client ${socket.id}:`); + console.log(data); + const dateTime = new Date(); + try { + const {rows} = await pool.query("select device_id from device_tokens where socket=$1 and connected=TRUE", [socket.id]); + if (rows.length != 1) throw new Error("No device with that ID connected to Socket."); + const result = await pool.query("update peripherals set last_pos=$1, last_set=$2 where device_id=$3 and peripheral_number=$4 returning id, user_id", + [data.pos, dateTime, rows[0].device_id, data.port] + ); + if (result.rowCount != 1) throw new Error("No such peripheral in database"); + + const {rows: userRows} = await pool.query("select socket from user_tokens where user_id=$1", [result.rows[0].user_id]); + if (userRows.length == 1) { + if (userRows[0]){ + const userSocket = userRows[0].socket; + io.to(userSocket).emit("pos_hit", {periphID: result.rows[0].id}); + } + } + else console.log("No App connected"); + + } catch (error) { + console.error(`Error processing job:`, error); + throw error; + } + }); + + socket.on("disconnect", async () => { + if (socket.user) { + console.log("user disconnect"); + await pool.query("update user_tokens set connected=FALSE where socket=$1", [socket.id]); + } + else { + console.log("device disconnect"); + await pool.query("update device_tokens set connected=FALSE where socket=$1", [socket.id]); + } + }); +}); async function createToken(userId) { const token = jwt.sign({ type: 'user', userId }, JWT_SECRET, { expiresIn: TOKEN_EXPIRY }); @@ -99,7 +199,10 @@ async function authenticateToken(req, res, next) { const {rows} = await pool.query("select device_id from device_tokens where token=$1", [token]); if (rows.length != 1) throw new Error("Invalid/Expired Token"); req.peripheral = payload.peripheralId; - } + } + else { + throw new Error("Invalid/Expired Token"); + } next(); } catch { return res.sendStatus(403); // invalid/expired token @@ -107,7 +210,6 @@ async function authenticateToken(req, res, next) { } app.get('/', (req, res) => { - // console.log(req); res.send('Hello World!'); }); @@ -255,16 +357,14 @@ app.get('/verify_device', authenticateToken, async (req, res) => { app.get('/position', authenticateToken, async (req, res) => { console.log("devicepos"); try { - const {rows} = await pool.query("select * from peripherals where device_id=$1", [req.peripheral]); + const {rows} = await pool.query("select last_pos, peripheral_number, await_calib from peripherals where device_id=$1", + [req.peripheral]); + if (rows.length == 0) { return res.sendStatus(404); } - const lastPosList = rows.map(row => row.last_pos); - const portNums = rows.map(row => row.peripheral_number); - const awaitCalibList = rows.map(row => row.await_calib); - const ready = rows.map(row => row.calibrated); - res.status(200).json({positions: lastPosList, port_nums: portNums, calib_needed: awaitCalibList, ready: ready}); + res.status(200).json(rows); } catch { res.status(500).json({error: "server error"}); } @@ -273,17 +373,20 @@ app.get('/position', authenticateToken, async (req, res) => { app.post('/manual_position_update', authenticateToken, async (req, res) => { console.log("setpos"); try { - const {periphId, newPos, time} = req.body; - const dateTime = new Date(time); - const result = await pool.query("update peripherals set last_pos=$1, last_set=$2 where id=$3 and user_id=$4", - [newPos, dateTime, periphId, req.user] - ); - - if (result.rowCount === 0) return res.sendStatus(404); + const {periphId, periphNum, deviceId, newPos} = req.body; + const changedPosList = [{periphNum: periphNum, periphID: periphId, pos: newPos}]; - res.sendStatus(204); - } catch { - res.status(500).json({error: "server error"}); + // Schedule the job to run immediately + const job = await agenda.now('posChange', {deviceID: deviceId, changedPosList: changedPosList, userID: req.user}); + + res.status(202).json({ // 202 Accepted, as processing happens in background + success: true, + message: 'Request accepted for immediate processing.', + jobId: job.attrs._id + }); + } catch (error) { + console.error('Error triggering immediate action:', error); + res.status(500).json({ success: false, message: 'Failed to trigger immediate action', error: error.message }); } }); @@ -291,11 +394,16 @@ app.post('/calib', authenticateToken, async (req, res) => { console.log("calibrate"); try { const {periphId} = req.body; - const result = await pool.query("update peripherals set await_calib=true where id=$1 and user_id=$2", - [periphId, req.user]); - if (result.rowCount === 0) return res.sendStatus(404); - res.sendStatus(204); - } catch { + // Schedule the job to run immediately + const job = await agenda.now('calib', {periphID: periphId, userID: req.user}); + + res.status(202).json({ // 202 Accepted, as processing happens in background + success: true, + message: 'Request accepted for immediate processing.', + jobId: job.attrs._id + }); + } catch (err) { + console.error('Error triggering immediate action:', err); res.sendStatus(500); } }) @@ -304,10 +412,13 @@ app.post('/cancel_calib', authenticateToken, async (req, res) => { console.log("cancelCalib"); try { const {periphId} = req.body; - const result = await pool.query("update peripherals set await_calib=false where id=$1 and user_id=$2", - [periphId, req.user]); - if (result.rowCount === 0) return res.sendStatus(404); - res.sendStatus(204); + const job = await agenda.now('cancel_calib', {periphID: periphId, userID: req.user}); + + res.status(202).json({ // 202 Accepted, as processing happens in background + success: true, + message: 'Request accepted for immediate processing.', + jobId: job.attrs._id + }); } catch { res.sendStatus(500); } diff --git a/package-lock.json b/package-lock.json index 0075057..0ed4036 100644 --- a/package-lock.json +++ b/package-lock.json @@ -17,6 +17,7 @@ "mongoose": "^8.16.1", "node-schedule": "^2.1.1", "pg": "^8.16.0", + "pg-format": "^1.0.4", "socket.io": "^4.8.1" } }, @@ -2811,6 +2812,15 @@ "integrity": "sha512-nkc6NpDcvPVpZXxrreI/FOtX3XemeLl8E0qFr6F2Lrm/I8WOnaWNhIPK2Z7OHpw7gh5XJThi6j6ppgNoaT1w4w==", "license": "MIT" }, + "node_modules/pg-format": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/pg-format/-/pg-format-1.0.4.tgz", + "integrity": "sha512-YyKEF78pEA6wwTAqOUaHIN/rWpfzzIuMh9KdAhc3rSLQ/7zkRFcCgYBAEGatDstLyZw4g0s9SNICmaTGnBVeyw==", + "license": "MIT", + "engines": { + "node": ">=4.0" + } + }, "node_modules/pg-int8": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/pg-int8/-/pg-int8-1.0.1.tgz", diff --git a/package.json b/package.json index d858ca1..7ea910b 100644 --- a/package.json +++ b/package.json @@ -18,6 +18,7 @@ "mongoose": "^8.16.1", "node-schedule": "^2.1.1", "pg": "^8.16.0", + "pg-format": "^1.0.4", "socket.io": "^4.8.1" } }