Skip to content

Commit

Permalink
Merge pull request #87 from nemozak1/opeyII_integration
Browse files Browse the repository at this point in the history
bugfix/remove potentially broken packages and redundancies
  • Loading branch information
simonredfern authored Feb 21, 2025
2 parents eec0b64 + cd13f05 commit 648d35e
Show file tree
Hide file tree
Showing 7 changed files with 254 additions and 1,230 deletions.
6 changes: 0 additions & 6 deletions babel.config.js

This file was deleted.

21 changes: 7 additions & 14 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
"version": "1.1.3",
"private": true,
"types": [
"jest",
"node"
"jest"
],
"scripts": {
"dev": "vite & ts-node server/app.ts",
Expand All @@ -22,9 +21,8 @@
"@element-plus/icons-vue": "^2.1.0",
"@fontsource/roboto": "^5.0.0",
"@highlightjs/vue-plugin": "^2.1.0",
"@types/jest": "^29.5.14",
"@types/supertest": "^6.0.2",
"ai": "^4.1.11",
"@types/node-fetch": "^2.6.12",
"ai": "^4.1.43",
"axios": "^1.7.4",
"cheerio": "^1.0.0",
"class-transformer": "^0.5.1",
Expand All @@ -34,12 +32,11 @@
"element-plus": "^2.3.9",
"express": "^4.21.0",
"express-session": "^1.17.3",
"got": "^14.4.5",
"highlight.js": "^11.8.0",
"json-editor-vue": "^0.17.3",
"jsonwebtoken": "^9.0.2",
"markdown-it": "^14.1.0",
"node-fetch": "v2.6",
"node-fetch": "^2.6.7",
"oauth": "^0.10.0",
"obp-typescript": "^1.0.36",
"pinia": "^2.0.37",
Expand All @@ -49,7 +46,6 @@
"routing-controllers": "^0.10.4",
"socket.io": "^4.7.5",
"socket.io-client": "^4.7.5",
"supertest": "^7.0.0",
"typedi": "^0.10.0",
"uuid": "^9.0.1",
"vanilla-jsoneditor": "^2.3.3",
Expand All @@ -60,22 +56,18 @@
"ws": "^8.18.0"
},
"devDependencies": {
"@babel/core": "^7.26.8",
"@babel/preset-env": "^7.26.8",
"@babel/preset-typescript": "^7.26.0",
"@rushstack/eslint-patch": "^1.4.0",
"@types/express": "^5.0.0",
"@types/jest": "^29.5.14",
"@types/jsdom": "^21.1.7",
"@types/jsonwebtoken": "^9.0.6",
"@types/markdown-it": "^14.1.1",
"@types/node": "^22.13.4",
"@types/node": "^20.5.7",
"@vitejs/plugin-vue": "^4.3.0",
"@vitejs/plugin-vue-jsx": "^3.1.0",
"@vue/eslint-config-prettier": "^9.0.0",
"@vue/eslint-config-typescript": "^14.0.0",
"@vue/test-utils": "^2.4.0",
"@vue/tsconfig": "^0.1.3",
"babel-jest": "^29.7.0",
"eslint": "^9.15.0",
"eslint-plugin-vue": "^9.12.0",
"jest": "^29.7.0",
Expand All @@ -84,6 +76,7 @@
"npm-run-all2": "^7.0.1",
"prettier": "^3.0.1",
"superagent": "^9.0.0",
"supertest": "^7.0.0",
"ts-jest": "^29.2.5",
"ts-node": "^10.9.1",
"typescript": "~5.2.2",
Expand Down
81 changes: 29 additions & 52 deletions server/controllers/OpeyIIController.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import { streamText } from 'ai'
import axios from 'axios'
import { Controller, Session, Req, Res, Post, Get } from 'routing-controllers'
import { Request, Response } from 'express'
import { pipeline } from "node:stream/promises"
import { Service } from 'typedi'
import OBPClientService from '../services/OBPClientService'
import OpeyClientService from '../services/OpeyClientService'
import { v6 as uuid6 } from 'uuid';
import { Transform } from 'stream'

import { UserInput } from '../schema/OpeySchema'

@Service()
Expand Down Expand Up @@ -41,7 +39,7 @@ export class OpeyController {
async streamOpey(
@Session() session: any,
@Req() request: Request,
@Res() response: Response
@Res() response: Response,
) {

let user_input: UserInput
Expand All @@ -59,67 +57,46 @@ export class OpeyController {

console.log("Calling OpeyClientService.stream")

const streamMiddlewareTransform = new Transform({
transform(chunk, encoding, callback) {
console.log(`Logged Chunk: ${chunk}`)
this.push(chunk);
// const streamMiddlewareTransform = new Transform({
// transform(chunk, encoding, callback) {
// console.log(`Logged Chunk: ${chunk}`)
// this.push(chunk);

callback();
}
})
// callback();
// }
// })

let nodeStream: NodeJS.ReadableStream | null = null
let stream: NodeJS.ReadableStream | null = null

try {
// Read stream from OpeyClientService
nodeStream = await this.opeyClientService.stream(user_input)
console.debug(`Stream received readable: ${nodeStream.readable}`)
stream = await this.opeyClientService.stream(user_input)
console.debug(`Stream received readable: ${stream?.readable}`)

} catch (error) {
console.error("Error reading stream: ", error)
response.status(500).json({ error: 'Internal Server Error' })
return
return response.status(500).json({ error: 'Internal Server Error' })
}

if (!nodeStream || !nodeStream.readable) {
console.error("Stream is not readable")
response.status(500).json({ error: 'Internal Server Error' })
return
if (!stream || !stream.readable) {
console.error("Stream is not recieved or not readable")
return response.status(500).json({ error: 'Internal Server Error' })
}

try {
// response.writeHead(200, {
// 'Content-Type': "text/event-stream",
// 'Cache-Control': "no-cache",
// 'Connection': "keep-alive"
// });

response.setHeader('Content-Type', 'text/event-stream')
response.setHeader('Cache-Control', 'no-cache')
response.setHeader('Connection', 'keep-alive')

let data: any[] = []

nodeStream.on('data', (chunk) => {
const bufferChunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
data.push(bufferChunk);
response.write(`data: ${chunk.toString()}\n\n`)
return new Promise<Response>((resolve, reject) => {
stream.pipe(response)
stream.on('end', () => {
response.status(200)
resolve(response)
})
nodeStream.on('end', () => {
//console.log('Stream ended')
const totalData = Buffer.concat(data)
response.write(totalData)
response.end()
stream.on('error', (error) => {
console.error("Error piping stream: ", error)
reject(error)
})
nodeStream.on('error', (error) => {
console.error(error)
response.write(`data: Error reading stream\n\n`)
response.end()
})
} catch (error) {
console.error("Error writing data: ", error)
response.status(500).json({ error: 'Internal Server Error' })
}

})


}

@Post('/invoke')
Expand Down
10 changes: 7 additions & 3 deletions server/services/OpeyClientService.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Service } from 'typedi'
import { UserInput, StreamInput, OpeyConfig, AuthConfig } from '../schema/OpeySchema'
import fetch from 'node-fetch';
import { Readable } from "stream"
import fetch from 'node-fetch'

@Service()
export default class OpeyClientService {
Expand Down Expand Up @@ -47,7 +48,7 @@ export default class OpeyClientService {
}
}

async stream(user_input: UserInput): Promise<NodeJS.ReadableStream> {
async stream(user_input: UserInput): Promise<any> {
// Endpoint to post a message to Opey and stream the response tokens/messages
try {

Expand All @@ -69,7 +70,10 @@ export default class OpeyClientService {
if (!response.body) {
throw new Error("No response body")
}
return response.body as NodeJS.ReadableStream

console.log("Got response body: ", response.body) //DEBUG

return response.body
}
catch (error) {
throw new Error(`Error streaming from Opey: ${error}`)
Expand Down
47 changes: 26 additions & 21 deletions tests/opey-unit.test.ts → tests/opey-controller.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,15 @@ describe('OpeyController', () => {
expect(res.statusCode).toBe(200);
})

it('streamOpey', () => {
it('streamOpey', async () => {

const _eventEmitter = new EventEmitter();
_eventEmitter.addListener('data', () => {
console.log('Data received')
})
// The default event emitter does nothing, so replace
const res = httpMocks.createResponse({
eventEmitter: _eventEmitter,
const res = await httpMocks.createResponse({
eventEmitter: EventEmitter,
writableStream: Stream.Writable
});

Expand All @@ -132,27 +132,32 @@ describe('OpeyController', () => {
} as unknown as Request;

// Define handelrs for events
res.on('end', () => {
console.log('Stream ended')
console.log(res._getData())
expect(res.statusCode).toBe(200);
})



let chunks: any[] = [];
res.on('data', (chunk) => {
console.log(chunk)
chunks.push(chunk);
expect(chunk).toBeDefined();
})

opeyController.streamOpey({}, req, res)
.then((res) => {
console.log(res)
})
try {
const response = await opeyController.streamOpey({}, req, res)

response.on('end', async () => {
console.log('Stream ended')
console.log(res._getData())
await expect(res.statusCode).toBe(200);
})

response.on('data', async (chunk) => {
console.log(chunk)
await chunks.push(chunk);
await expect(chunk).toBeDefined();
})
} catch (error) {
console.error(error)
}


expect(chunks.length).toBe(10);
expect(MockOpeyClientService.stream).toHaveBeenCalled();
expect(res).toBeDefined();
await expect(chunks.length).toBe(10);
await expect(MockOpeyClientService.stream).toHaveBeenCalled();
await expect(res).toBeDefined();

})
})
Loading

0 comments on commit 648d35e

Please # to comment.