Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Bugfix/Openai assistant thread not found #3426

Merged
merged 1 commit into from
Oct 29, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 147 additions & 90 deletions packages/components/nodes/agents/OpenAIAssistant/OpenAIAssistant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -267,28 +267,54 @@ class OpenAIAssistant_Agents implements INode {
// List all runs, in case existing thread is still running
if (!isNewThread) {
const promise = (threadId: string) => {
return new Promise<void>((resolve) => {
return new Promise<void>((resolve, reject) => {
const maxWaitTime = 30000 // Maximum wait time of 30 seconds
const startTime = Date.now()
let delay = 500 // Initial delay between retries
const maxRetries = 10
let retries = 0

const timeout = setInterval(async () => {
const allRuns = await openai.beta.threads.runs.list(threadId)
if (allRuns.data && allRuns.data.length) {
const firstRunId = allRuns.data[0].id
const runStatus = allRuns.data.find((run) => run.id === firstRunId)?.status
if (
runStatus &&
(runStatus === 'cancelled' ||
runStatus === 'completed' ||
runStatus === 'expired' ||
runStatus === 'failed' ||
runStatus === 'requires_action')
) {
try {
const allRuns = await openai.beta.threads.runs.list(threadId)
if (allRuns.data && allRuns.data.length) {
const firstRunId = allRuns.data[0].id
const runStatus = allRuns.data.find((run) => run.id === firstRunId)?.status
if (
runStatus &&
(runStatus === 'cancelled' ||
runStatus === 'completed' ||
runStatus === 'expired' ||
runStatus === 'failed' ||
runStatus === 'requires_action')
) {
clearInterval(timeout)
resolve()
}
} else {
clearInterval(timeout)
resolve()
reject(new Error(`Empty Thread: ${threadId}`))
}
} else {
} catch (error: any) {
if (error.response?.status === 404) {
clearInterval(timeout)
reject(new Error(`Thread not found: ${threadId}`))
} else if (error.response?.status === 429 && retries < maxRetries) {
retries++
delay *= 2
console.warn(`Rate limit exceeded, retrying in ${delay}ms...`)
} else {
clearInterval(timeout)
reject(new Error(`Unexpected error: ${error.message}`))
}
}

// Timeout condition to stop the loop if maxWaitTime is exceeded
if (Date.now() - startTime > maxWaitTime) {
clearInterval(timeout)
resolve()
reject(new Error('Timeout waiting for thread to finish.'))
}
}, 500)
}, delay)
})
}
await promise(threadId)
Expand Down Expand Up @@ -576,96 +602,127 @@ class OpenAIAssistant_Agents implements INode {

const promise = (threadId: string, runId: string) => {
return new Promise((resolve, reject) => {
const maxWaitTime = 30000 // Maximum wait time of 30 seconds
const startTime = Date.now()
let delay = 500 // Initial delay between retries
const maxRetries = 10
let retries = 0

const timeout = setInterval(async () => {
const run = await openai.beta.threads.runs.retrieve(threadId, runId)
const state = run.status
if (state === 'completed') {
clearInterval(timeout)
resolve(state)
} else if (state === 'requires_action') {
if (run.required_action?.submit_tool_outputs.tool_calls) {
try {
const run = await openai.beta.threads.runs.retrieve(threadId, runId)
const state = run.status

if (state === 'completed') {
clearInterval(timeout)
const actions: ICommonObject[] = []
run.required_action.submit_tool_outputs.tool_calls.forEach((item) => {
const functionCall = item.function
let args = {}
try {
args = JSON.parse(functionCall.arguments)
} catch (e) {
console.error('Error parsing arguments, default to empty object')
}
actions.push({
tool: functionCall.name,
toolInput: args,
toolCallId: item.id
resolve(state)
} else if (state === 'requires_action') {
if (run.required_action?.submit_tool_outputs.tool_calls) {
clearInterval(timeout)
const actions: ICommonObject[] = []
run.required_action.submit_tool_outputs.tool_calls.forEach((item) => {
const functionCall = item.function
let args = {}
try {
args = JSON.parse(functionCall.arguments)
} catch (e) {
console.error('Error parsing arguments, default to empty object')
}
actions.push({
tool: functionCall.name,
toolInput: args,
toolCallId: item.id
})
})
})

const submitToolOutputs = []
for (let i = 0; i < actions.length; i += 1) {
const tool = tools.find((tool: any) => tool.name === actions[i].tool)
if (!tool) continue
const submitToolOutputs = []
for (let i = 0; i < actions.length; i += 1) {
const tool = tools.find((tool: any) => tool.name === actions[i].tool)
if (!tool) continue

// Start tool analytics
const toolIds = await analyticHandlers.onToolStart(tool.name, actions[i].toolInput, parentIds)
if (shouldStreamResponse && sseStreamer) {
sseStreamer.streamToolEvent(chatId, tool.name)
// Start tool analytics
const toolIds = await analyticHandlers.onToolStart(tool.name, actions[i].toolInput, parentIds)
if (shouldStreamResponse && sseStreamer) {
sseStreamer.streamToolEvent(chatId, tool.name)
}

try {
const toolOutput = await tool.call(actions[i].toolInput, undefined, undefined, {
sessionId: threadId,
chatId: options.chatId,
input
})
await analyticHandlers.onToolEnd(toolIds, toolOutput)
submitToolOutputs.push({
tool_call_id: actions[i].toolCallId,
output: toolOutput
})
usedTools.push({
tool: tool.name,
toolInput: actions[i].toolInput,
toolOutput
})
} catch (e) {
await analyticHandlers.onToolEnd(toolIds, e)
console.error('Error executing tool', e)
clearInterval(timeout)
reject(
new Error(
`Error processing thread: ${state}, Thread ID: ${threadId}, Run ID: ${runId}, Tool: ${tool.name}`
)
)
return
}
}

const newRun = await openai.beta.threads.runs.retrieve(threadId, runId)
const newStatus = newRun?.status

try {
const toolOutput = await tool.call(actions[i].toolInput, undefined, undefined, {
sessionId: threadId,
chatId: options.chatId,
input
})
await analyticHandlers.onToolEnd(toolIds, toolOutput)
submitToolOutputs.push({
tool_call_id: actions[i].toolCallId,
output: toolOutput
})
usedTools.push({
tool: tool.name,
toolInput: actions[i].toolInput,
toolOutput
})
if (submitToolOutputs.length && newStatus === 'requires_action') {
await openai.beta.threads.runs.submitToolOutputs(threadId, runId, {
tool_outputs: submitToolOutputs
})
resolve(state)
} else {
await openai.beta.threads.runs.cancel(threadId, runId)
resolve('requires_action_retry')
}
} catch (e) {
await analyticHandlers.onToolEnd(toolIds, e)
console.error('Error executing tool', e)
clearInterval(timeout)
reject(
new Error(
`Error processing thread: ${state}, Thread ID: ${threadId}, Run ID: ${runId}, Tool: ${tool.name}`
)
new Error(`Error submitting tool outputs: ${state}, Thread ID: ${threadId}, Run ID: ${runId}`)
)
break
}
}

const newRun = await openai.beta.threads.runs.retrieve(threadId, runId)
const newStatus = newRun?.status

try {
if (submitToolOutputs.length && newStatus === 'requires_action') {
await openai.beta.threads.runs.submitToolOutputs(threadId, runId, {
tool_outputs: submitToolOutputs
})
resolve(state)
} else {
await openai.beta.threads.runs.cancel(threadId, runId)
resolve('requires_action_retry')
}
} catch (e) {
clearInterval(timeout)
reject(new Error(`Error submitting tool outputs: ${state}, Thread ID: ${threadId}, Run ID: ${runId}`))
}
} else if (state === 'cancelled' || state === 'expired' || state === 'failed') {
clearInterval(timeout)
reject(
new Error(
`Error processing thread: ${state}, Thread ID: ${threadId}, Run ID: ${runId}, Status: ${state}`
)
)
}
} else if (state === 'cancelled' || state === 'expired' || state === 'failed') {
} catch (error: any) {
if (error.response?.status === 404 || error.response?.status === 429) {
clearInterval(timeout)
reject(new Error(`API error: ${error.response?.status} for Thread ID: ${threadId}, Run ID: ${runId}`))
} else if (retries < maxRetries) {
retries++
delay *= 2 // Exponential backoff
console.warn(`Transient error, retrying in ${delay}ms...`)
} else {
clearInterval(timeout)
reject(new Error(`Max retries reached. Error: ${error.message}`))
}
}

// Stop the loop if maximum wait time is exceeded
if (Date.now() - startTime > maxWaitTime) {
clearInterval(timeout)
reject(
new Error(`Error processing thread: ${state}, Thread ID: ${threadId}, Run ID: ${runId}, Status: ${state}`)
)
reject(new Error('Timeout waiting for thread to finish.'))
}
}, 500)
}, delay)
})
}

Expand Down
Loading