import os import re import sys _SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__)) _SUBMIT_ROOT = os.path.dirname(os.path.dirname(_SCRIPTS_DIR)) if _SCRIPTS_DIR not in sys.path: sys.path.insert(0, _SCRIPTS_DIR) if _SUBMIT_ROOT not in sys.path: sys.path.insert(0, _SUBMIT_ROOT) _DIRECTION_PATTERN = re.compile( r"situated to the (left of|right of|in front of|behind|below|above) the", re.IGNORECASE, ) _SKIP_DIRECTIONS = {"below", "above"} _DIRECTION_CANONICAL = { "left of": "left", "right of": "right", "in front of": "front", "behind": "behind", } _DIRECTION_TO_PHRASE = { "left": "left of", "right": "right of", "in_front_of": "in front of", "behind": "behind", } _FACING_PROMPT_TEMPLATE = ( "In the image, there is a {object_name}. " "Which direction is the {object_name} facing from the camera's perspective? " "Choose exactly one and output only that choice: " "left, right, toward the camera, away from the camera." ) _OBJECT_EXTRACTION_SYSTEM = ( "From the following sentence, identify the target object that is being acted upon, " "placed, or referred to as the main subject of interest. " "Output only the object name, nothing else." ) def should_remap(question): match = _DIRECTION_PATTERN.search(question) if match is None: return False, None, None phrase = match.group(1).lower() if phrase in _SKIP_DIRECTIONS: return False, None, None canonical = _DIRECTION_CANONICAL.get(phrase) return True, canonical, phrase def _parse_facing_direction(raw_output): text = raw_output.lower() if "away from" in text or "backward" in text: return "facing_away_from_camera" if "toward the camera" in text or "towards the camera" in text: return "facing_toward_camera" if "forward" in text: return "facing_toward_camera" has_left = "left" in text has_right = "right" in text if has_left and not has_right: return "facing_left" if has_right and not has_left: return "facing_right" if "toward" in text or "towards" in text: return "facing_toward_camera" return None def _map_direction(orig_canonical, facing_key, direction_map): if orig_canonical is None or facing_key is None: return None facing_entry = direction_map.get(facing_key, {}) return facing_entry.get(orig_canonical) def _replace_direction_in_question(question, orig_phrase, new_canonical): new_phrase = _DIRECTION_TO_PHRASE.get(new_canonical) if new_phrase is None: return question old_pattern = f"to the {orig_phrase} the" new_pattern = f"to the {new_phrase} the" return question.replace(old_pattern, new_pattern) def _extract_object_from_question(question, clf_kwargs): m = re.search(r"there is a (.+?)\.", question, re.IGNORECASE) if m: return m.group(1).strip() import torch from lm_classifier import _apply_chat_template first_sentence = (question.split(".")[0] + ".") if "." in question else question clf_model = clf_kwargs["model"] clf_tokenizer = clf_kwargs["tokenizer"] first_device = next(clf_model.parameters()).device messages = [ {"role": "system", "content": _OBJECT_EXTRACTION_SYSTEM}, {"role": "user", "content": f"Sentences: {first_sentence}"}, ] text = _apply_chat_template(clf_tokenizer, messages, enable_thinking=False) inputs = clf_tokenizer([text], return_tensors="pt").to(first_device) with torch.no_grad(): generated_ids = clf_model.generate(**inputs, max_new_tokens=16, do_sample=False) trimmed = [out[len(inp):] for inp, out in zip(inputs.input_ids, generated_ids)] raw = clf_tokenizer.batch_decode( trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False )[0].strip() if "" in raw: raw = raw.split("", 1)[1].strip() if not raw or raw == "[]": raw = "object" return raw def run_context_with_remap(question, image_path, depth_path, model_kwargs, clf_kwargs, direction_map): import torch from robobrain_runner import run_robobrain from evaluation import _extract_first_point do_remap, orig_dir_canonical, orig_dir_phrase = should_remap(question) if not do_remap: return run_robobrain(question, image_path, depth_path, model_kwargs, LM_classify="context") answer1 = run_robobrain(question, image_path, depth_path, model_kwargs, LM_classify="context") coord1_tuple, _ = _extract_first_point(answer1) coord1_str = f"({coord1_tuple[0]}, {coord1_tuple[1]})" if coord1_tuple else "" object_name = _extract_object_from_question(question, clf_kwargs) torch.cuda.empty_cache() obj_label = object_name.strip() or "object" dir_prompt = _FACING_PROMPT_TEMPLATE.format(object_name=obj_label) dir_answer = run_robobrain(dir_prompt, image_path, depth_path, model_kwargs, add_think_override=False) facing_key = _parse_facing_direction(dir_answer) new_dir = _map_direction(orig_dir_canonical, facing_key, direction_map) if new_dir is None: return coord1_str or answer1 new_question = _replace_direction_in_question(question, orig_dir_phrase, new_dir) answer2 = run_robobrain(new_question, image_path, depth_path, model_kwargs, LM_classify="context") coord2_tuple, _ = _extract_first_point(answer2) coord2_str = f"({coord2_tuple[0]}, {coord2_tuple[1]})" if coord2_tuple else "" parts = [p for p in [coord1_str, coord2_str] if p] return " ".join(parts) if parts else (answer1 or answer2)