forked from Tigul/neem_interface_python
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrosprolog_client.py
126 lines (108 loc) · 5.24 KB
/
rosprolog_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import rospy
from json_prolog_msgs.srv import PrologQuery, PrologNextSolution, PrologFinish, PrologQueryRequest, PrologNextSolutionRequest, PrologFinishRequest
from typing import Iterator, Dict, List, Optional
"""
This is the ROS Prolog Client which connects to the Rosprolog service of (old) KnowRob.
It allows you to send Prolog queries to KnowRob.
This interface is meant to be used with the old KnowRob, which is: https://github.com/suturo/knowrob
"""
class PrologQueryWrapper:
def __init__(self, query_str: str, simple_query_srv, next_solution_srv, finish_srv, iterative=True):
"""
Wraps around ROS Prolog services for a convenient Python interface.
:param iterative: if False, all solutions will be calculated by rosprolog during the first service call
"""
self._simple_query_srv = simple_query_srv
self._next_solution_srv = next_solution_srv
self._finish_query_srv = finish_srv
self._finished = False
self._query_id = None
# Request to initiate the query
try:
request = PrologQueryRequest()
request.query = query_str
request.mode = 1 if iterative else 0
response = self._simple_query_srv(request)
if not response.ok:
raise Exception(f'Prolog query failed: {response.message}')
#self._query_id = response.id
except rospy.ServiceException as e:
raise Exception(f"Service call failed: {e}")
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.finish()
def solutions(self) -> Iterator[Dict]:
"""Retrieve solutions from the query one at a time."""
try:
while not self._finished:
request = PrologNextSolutionRequest(id=self._query_id)
response = self._next_solution_srv(request)
if response.status == response.OK:
yield self._json_to_dict(response.solution)
elif response.status == response.NO_SOLUTION:
break
elif response.status == response.QUERY_FAILED:
raise Exception(f"Prolog query failed: {response}")
else:
raise Exception(f"Unknown query status {response.status}")
finally:
self.finish()
def finish(self):
"""Finish the Prolog query and release resources."""
if not self._finished:
try:
request = PrologFinishRequest(id=self._query_id)
self._finish_query_srv(request)
except rospy.ServiceException as e:
rospy.logwarn(f"Failed to finish query: {e}")
finally:
self._finished = True
def _json_to_dict(self, json_text):
import json
return json.loads(json_text)
class Prolog:
def __init__(self, namespace='rosprolog'):
"""Initialize Prolog service clients."""
rospy.loginfo("[KnowRob] initialize client...")
rospy.wait_for_service(f'{namespace}/query')
rospy.wait_for_service(f'{namespace}/next_solution')
rospy.wait_for_service(f'{namespace}/finish')
self._simple_query_srv = rospy.ServiceProxy(f'{namespace}/query', PrologQuery)
self._next_solution_srv = rospy.ServiceProxy(f'{namespace}/next_solution', PrologNextSolution)
self._finish_query_srv = rospy.ServiceProxy(f'{namespace}/finish', PrologFinish)
#self.all_solutions(f"init_gpsr_2024.") # intit semantic map things
rospy.loginfo("[KnowRob] done.")
def query(self, query_str):
"""Execute a Prolog query, yielding solutions iteratively."""
return PrologQueryWrapper(query_str, simple_query_srv=self._simple_query_srv,
next_solution_srv=self._next_solution_srv, finish_srv=self._finish_query_srv)
def once(self, query_str: str) -> Optional[Dict]:
"""Retrieve one solution from the query and finish immediately."""
# differenciate between fallschool designator queries and normal prolog
#if "type=" in query_str:
# return kdc.send_simple_query(query_str)
print(f"Query Once: {query_str}")
with PrologQueryWrapper(query_str, self._simple_query_srv, self._next_solution_srv, self._finish_query_srv) as query:
try:
solution = next(query.solutions())
if 0 == len(solution):
return True
else:
return solution
except StopIteration:
return False
def all_solutions(self, query_str: str) -> List[Dict]:
"""Retrieve all solutions at once."""
# differenciate between fallschool designator queries and normal prolog
#if "type=" in query_str:
# return kdc.send_simple_query(query_str)
#print(f"{query_str}")
with PrologQueryWrapper(query_str, self._simple_query_srv, self._next_solution_srv, self._finish_query_srv, iterative=False) as query:
solution = list(query.solutions())
if 0 == len(solution):
return False
elif (1 == len(solution)) and (0 == len(solution[0])):
return True
else:
return solution