Source code for move_base_nav

#! /usr/bin/env python

'''navigation using MoveBase and the navigation stack

In a way very similar to how the :any:`node-bug-m` acts, this node
implements a composite behaviour which drives the robot towards a target
plus a final orientation. But in this case, the node uses MoveBase uses
the navigation stack to reach the final position. 

Authors
	prof. Carmine Recchiuto (UniGe), Francesco Ganci (S4143910)

Version:
	v1.5.0
'''

import rospy
import os
from std_srvs.srv import SetBool, SetBoolRequest, SetBoolResponse
from std_srvs.srv import Empty, EmptyRequest, EmptyResponse
from geometry_msgs.msg import Point
from tf import transformations
import math
from nav_msgs.msg import Odometry
from move_base_msgs.msg import MoveBaseActionGoal
from actionlib_msgs.msg import GoalID



active_ = False
''' the activity status of the node. 
'''



service_move_base_switch = "/nav_stack_switch"
''' name of the service to trigger the state of the node
'''

srv_move_base_switch = None
''' service handle for :any:`service_move_base_switch`
'''

[docs]def move_base_switch( data ): ''' used for switching the activity status of the node. The node also updates the target, reading the new one from the parameters server. Arguments: data (SetBool) : the boolean SetBool.data inside the message is the new activity status Note: before calling the service, make sure the three ros parameters 'des_pos_x' , 'des_pos_y' and 'des_yaw' have been set. Todo: differently from the other behavioural nodes, this node requires to be retriggered using this service each time the objective changes. this could be a desin issue. The node should update automatically the objective when it changes. ''' global active_ global desired_position_, desired_yaw_ res = SetBoolResponse( ) res.success = True res.message = "" # check the activity status if active_ and not data.data: # rospy.loginfo( "(move_base_nav ) move_base navigation is OFF" ) active_ = False change_state( 0 ) elif not active_ and data.data: #rospy.loginfo( "(move_base_nav ) move_base navigation is ON" ) active_ = True elif active_ and data.data: #rospy.loginfo( "(move_base_nav ) switching target" ) pass else: res.message = "trying to disable something already off ..." if active_: desired_position_.x = rospy.get_param( "des_pos_x" ) desired_position_.y = rospy.get_param( "des_pos_y" ) desired_yaw_ = rospy.get_param( "des_yaw" ) #rospy.loginfo( f"(move_base_nav ) new target is (x={desired_position_.x}, y={desired_position_.y}, yaw={desired_yaw_})" ) return res
service_move_base_signal = "/nav_stack_signal" ''' the node assumes that some other node implements this client, used for sending the signal when the robot reaches the objective. ''' cl_move_base_signal = None ''' (service client handle) client for :any:`service_move_base_signal` '''
[docs]def send_signal( ): ''' send a signal through the service :any:`service_move_base_signal` if available. ''' global cl_move_base_signal global service_move_base_signal if cl_move_base_signal == None: #rospy.loginfo( f"(move_base_nav ) TRYING client {service_move_base_signal} ... " ) rospy.sleep(rospy.Duration(1)) cl_move_base_signal = rospy.ServiceProxy( service_move_base_signal, Empty ) if cl_move_base_signal == None: #rospy.loginfo( f"(move_base_nav ) unable to connect with {service_move_base_signal} -- retrying later..." ) return else: #rospy.loginfo( f"(move_base_nav ) FOUND SERVICE {service_move_base_signal}" ) pass if cl_move_base_signal != None: try: cl_move_base_signal( ) except rospy.ServiceException: #rospy.loginfo( f"(move_base_nav ) unable to connect with {service_move_base_signal} -- service call failed" ) cl_move_base_signal = None
service_head_orient_switch = "/head_orient_switch" ''' service name for the head orientation ''' cl_head_orient_switch = None ''' (client handle) client to enable and disable the head orientation ''' # current values position_ = Point( ) ''' the current position of the robot, from the odometry ''' yaw_ = 0.0 ''' the orientation of the robot about the 'z' axis, from the odometry ''' # target desired_position_ = Point( ) ''' the objective position for move_base. obtained from the parameters 'des_pos_x' and 'des_pos_y' ''' desired_position_.x = 0.0 desired_position_.y = 0.0 desired_position_.z = 0.0 desired_yaw_ = 0.0 ''' the current orientation of the robot ''' # error evaluation err_pos = math.inf ''' teh error between the desired position and the current one. Note: if the position is not measured, the distance will be always infinite. ''' err_yaw = math.pi/2.0 ''' the error between the current orientation and the desired one. Note: its default value is PI/2, and also when the distance is not measured ''' # thresholds threshold_position_ = 0.35 ''' the maximum allowed position error while reaching the target. ''' yaw_precision_ = math.pi / 9 ''' +/- 20 degree allowed ''' yaw_precision_2_ = math.pi / 90 ''' +/- 2 degree allowed ''' # odometry measurement topic_odometry = "/odom" ''' the node measures the position of the robot during the navigation. ''' sub_odometry = None ''' (subscription handle) subscription to the odometry '''
[docs]def normalize_angle( angle ): ''' angle normalization between -PI and PI. Note: the parameter "angle" is the err_yaw in this implementation. ''' if(math.fabs(angle) > math.pi): angle = angle - (2 * math.pi * angle) / (math.fabs(angle)) return angle
[docs]def cbk_odometry( msg ): ''' subscription callback for the odometry topic ''' global active_ global position_, yaw_ global desired_position_, desired_yaw_ global err_pos, err_yaw if active_: # position position_ = msg.pose.pose.position # yaw quaternion = ( msg.pose.pose.orientation.x, msg.pose.pose.orientation.y, msg.pose.pose.orientation.z, msg.pose.pose.orientation.w) euler = transformations.euler_from_quaternion(quaternion) yaw_ = euler[2] # estimation of errors err_pos = math.sqrt(pow(desired_position_.y - position_.y, 2) + pow(desired_position_.x - position_.x, 2)) err_yaw = normalize_angle(desired_yaw_ - yaw_) else: # set defaults err_pos = math.inf err_yaw = math.pi / 2.0
topic_move_base_goal = "/move_base/goal" ''' the topic for sending the target to the navigation stack ''' pub_move_base_goal = None ''' (publisher handle) move base goal '''
[docs]def move_base_send_target( x, y ): ''' send a target to move base. Parameters: x (float) : the x coordinate y (float) : the y coordinate ''' global topic_move_base_goal, pub_move_base_goal msg = MoveBaseActionGoal( ) msg.header.frame_id = "map" msg.goal.target_pose.header.frame_id = "map" msg.goal.target_pose.pose.position.x = x msg.goal.target_pose.pose.position.y = y msg.goal.target_pose.pose.position.z = 0.0 msg.goal.target_pose.pose.orientation.x = 0.0 msg.goal.target_pose.pose.orientation.y = 0.0 msg.goal.target_pose.pose.orientation.z = 0.0 msg.goal.target_pose.pose.orientation.w = 1.0 pub_move_base_goal.publish( msg ) rospy.sleep( rospy.Duration(1) )
topic_move_base_cancel = "/move_base/cancel" ''' name of the topic for cancelling the navigation request ''' pub_move_base_cancel = None ''' (publisher handle) the publisher for cancelling the request '''
[docs]def move_base_cancel_goal( ): ''' send a cancellation request to move_base ''' global topic_move_base_cancel, pub_move_base_cancel msg = GoalID( ) pub_move_base_cancel.publish( msg ) rospy.sleep( rospy.Duration(1) )
state_ = 0 ''' the current state of the node '''
[docs]def change_state( state ): ''' state transition from the current one to the one in the argument. ''' global state_ global cl_head_orient_switch global desired_position_ global state_description state_ = state if state_ == 0: # -- idle state_description = "idle -- waiting for a target" cl_head_orient_switch( False ) move_base_cancel_goal( ) elif state_ == 1: # -- move_base planning state_description = "motion planning -- sending the request to move_base" cl_head_orient_switch( False ) move_base_cancel_goal( ) move_base_send_target( desired_position_.x, desired_position_.y ) elif state_ == 2: # -- move_base motion state_description = "navigation -- going towards the target" cl_head_orient_switch( False ) elif state_ == 3: # -- head_orientation behaviour state_description = "head orientation" move_base_cancel_goal( ) cl_head_orient_switch( True ) elif state_ == 4: # -- send signal (end of the motion) state_description = "SUCCESS. sending signal" cl_head_orient_switch( False ) move_base_cancel_goal( ) else: #rospy.logwarn( f"(move_base_nav ) WARNING: unknown state {state_}" ) change_state( 0 )
replan_time = 30 # seconds ''' the planner, in case the robot is requiring too much time for reaching the point, can do a replanning request ''' working_rate = 10 # Hz ''' the working rate is the maximum update frequency of this node. the status is checked with a frequency of 'working_rate' Hz. ''' state_description = "" ''' just a message to make clear the current status of the node '''
[docs]def main( ): ''' state machine implementation ''' global active_, state_ global err_pos, err_yaw global threshold_position_, yaw_precision_2_ global replan_time, working_rate global state_description ''' while not rospy.is_shutdown(): if active_: rospy.loginfo( "ON" ) else: rospy.loginfo( "OFF" ) ''' ''' rospy.spin( ) ''' r = rospy.Rate( working_rate ) elapsed_time = 0.0 time_unit = rospy.Duration( 1/working_rate ) prev_output = "" while not rospy.is_shutdown( ): # wait (in any case) r.sleep( ) # check active status if not active_: continue # update the goal desired_position_.x = rospy.get_param( "des_pos_x" ) desired_position_.y = rospy.get_param( "des_pos_y" ) desired_yaw_ = rospy.get_param( "des_yaw" ) # run the state machine if state_ == 0: # -- idle if err_pos > threshold_position_: change_state( 1 ) elif err_yaw > yaw_precision_2_ : change_state( 2 ) elif state_ == 1: # -- move_base planning elapsed_time = 0.0 change_state( 2 ) elif state_ == 2: # -- move_base motion elapsed_time = elapsed_time + time_unit.to_sec() if elapsed_time > replan_time: #rospy.loginfo( f"(move_base_nav ) replanning move_base (timer expired, max {replan_time})" ) change_state( 1 ) # -- replanning continue if err_pos < threshold_position_: if err_yaw > yaw_precision_2_: change_state( 3 ) # -- orientation else: change_state( 4 ) # -- end of the task elif state_ == 3: # -- head_orientation behaviour if err_yaw <= yaw_precision_2_: change_state( 4 ) elif state_ == 4: # -- send signal (end of the motion) send_signal( ) change_state( 0 ) else: #rospy.logwarn( f"(move_base_nav ) WARNING: unknown state {state_}" ) change_state( 0 ) # output : current status if prev_output != state_description: #rospy.loginfo( f"(move_base_nav ) current state = {state_} ({state_description})" ) prev_output = state_description
def on_shut( ): rospy.loginfo( f"(move_base_nav ) closing..." ) if __name__ == "__main__": rospy.init_node( "move_base_nav" ) rospy.on_shutdown( on_shut ) # rospy.loginfo( f"(move_base_nav )" ) #rospy.loginfo( f"(move_base_nav ) starting..." ) rospy.sleep(rospy.Duration(2)) if not rospy.has_param( "des_pos_x" ) or not rospy.has_param( "des_pos_y" ) or not rospy.has_param( "des_yaw" ): #rospy.logerr( "(move_base_nav ) ERROR: parameters not found in the parameter server!" ) rospy.shutdown( ) rospy.sleep(rospy.Duration(1)) os.exit( ) #rospyloginfo( f"(move_base_nav ) service {service_move_base_switch} ... " ) srv_move_base_switch = rospy.Service( service_move_base_switch, SetBool, move_base_switch ) #rospy.loginfo( f"(move_base_nav ) service {service_move_base_switch} ... OK" ) #rospy.loginfo( f"(move_base_nav ) subscription: {topic_odometry} ... " ) sub_odometry = rospy.Subscriber( topic_odometry , Odometry, cbk_odometry ) rospy.sleep(rospy.Duration(1)) #rospy.loginfo( f"(move_base_nav ) subscription: {topic_odometry} ... OK" ) #rospy.loginfo( f"(move_base_nav ) client: {service_head_orient_switch} ... " ) cl_head_orient_switch = rospy.ServiceProxy( service_head_orient_switch, SetBool ) rospy.sleep(rospy.Duration(1)) #rospy.loginfo( f"(move_base_nav ) client: {service_head_orient_switch} ... OK" ) #rospy.loginfo( f"(move_base_nav ) publisher: {topic_move_base_goal} ... " ) pub_move_base_goal = rospy.Publisher( topic_move_base_goal, MoveBaseActionGoal, queue_size=10 ) #rospy.loginfo( f"(move_base_nav ) publisher: {topic_move_base_goal} ... OK" ) #rospy.loginfo( f"(move_base_nav ) publisher: {topic_move_base_cancel} ... " ) pub_move_base_cancel = rospy.Publisher( topic_move_base_cancel, GoalID, queue_size=10 ) #rospy.loginfo( f"(move_base_nav ) publisher: {topic_move_base_cancel} ... OK" ) rospy.sleep(rospy.Duration(2)) #rospy.loginfo( f"(move_base_nav ) ready" ) main( )