Source code for ocrtoc_dataset_toolkit.ocrtoc_dataset

# Author: Minghao Gou.

import numpy as np
import open3d as o3d
import matplotlib.pyplot as plt
from tqdm import tqdm
import os
import tempfile
import cv2

from .utils.logging import get_main_logger
from .utils.combine import combine, merge_pcds
from .utils.vis import get_random_color, generate_scene_pointcloud

logger = get_main_logger()

[docs]class OCRTOC_Dataset(): """OCRTOC Dataset toolkit class Args: root(str): root path for the dataset. """ def __init__(self, root): self.root = root self.scene_name_list = self.load_scene_name_list() self.object_name_list, self.object_id_dict = self.load_object_list() logger.info('obj name list:{}\nobj id dict:{}\nscene name list:{}'.format( self.object_name_list, self.object_id_dict, self.scene_name_list ) )
[docs] def load_object_list(self, save = False): """Load object list. Args: save(bool): save the object list or not. Returns: list, dict: object names and name-id mapping. """ if os.path.exists(os.path.join(self.root, 'object_name_list.txt')): f = open(os.path.join(self.root, 'object_name_list.txt')) object_name_list = f.readlines() f.close() object_id_dict = dict() for i in range(len(object_name_list)): object_name_list[i] = object_name_list[i].strip() object_id_dict[object_name_list[i]] = i + 1 return object_name_list, object_id_dict else: logger.warning('No existed object list, generate object list from scenes') object_name_set = set() for scene_id in range(len(self.scene_name_list)): scene_object_name_list = self.load_scene_object_list(scene_id) for object_name in scene_object_name_list: object_name_set.add(object_name) object_name_list = list(object_name_set) object_id_dict = dict() for i in range(len(object_name_list)): object_id_dict[object_name_list[i]] = i + 1 if save: logger.warning('Save object list to dataset folder') f = open(os.path.join(self.root, 'object_name_list.txt'), 'w') for object_name in object_name_list: f.write(object_name+'\n') f.close() return object_name_list, object_id_dict
[docs] def load_scene_name_list(self, save = False): """Load scene name list or generate one Returns: list: scene names. """ scene_list_file_path = os.path.join(self.root, 'scene_name_list.txt') if not os.path.exists(scene_list_file_path): logger.warning("No existed scene list, generate scene list from scenes") scene_name_list = os.listdir(os.path.join(self.root, 'scenes')) if save: logger.warning('Save scene name list to dataset folder') f = open(scene_list_file_path, 'w') for i in range(len(scene_name_list)): f.write(scene_name_list[i]+'\n') f.close() else: f = open(scene_list_file_path) scene_name_list = f.readlines() f.close() for i in range(len(scene_name_list)): scene_name_list[i] = scene_name_list[i].strip() return scene_name_list
[docs] def load_scene_image_number(self, scene_id): """Load images number in a scene Args: scene_id(int): scene index. Returns: int: images number in a scene. """ return len(os.listdir(os.path.join( self.root, 'scenes', self.load_scene_name(scene_id), 'rgb_undistort' )))
[docs] def load_scene_number(self): """Load total scenes number Returns: int: total scenes number in the dataset. """ return len(self.scene_name_list)
[docs] def load_object_number(self): """Load total objects number Returns: int: total objects number in the dataset. """ return len(self.object_name_list)
[docs] def load_total_image_number(self): """Load total images number Returns: int: total images number in the dataset. """ total_number = 0 for scene_id in range(self.load_scene_number()): total_number += self.load_scene_image_number(scene_id) return total_number
[docs] def load_scene_name(self, scene_id): """Load scene name Args: scene_id(int): scene index. Returns: str: scene name. """ return self.scene_name_list[scene_id]
[docs] def load_camera_param(self, scene_id): """Load camera fake intrinsic matrix for open3d, Args: scene_id(int): scene index. Returns: np.ndarray: fake camera intrinsic matrix. """ logger.warning("Camera parameter retrieved from 'load_camera_param' is not accurate, using 'load_real_camera_intrinsic' instead for accurate result") camera_intrin_matrix = np.load( os.path.join( self.root, 'scenes', self.load_scene_name(scene_id), 'color_camK.npy' ) ) param = o3d.camera.PinholeCameraParameters() param.extrinsic = np.eye(4,dtype=np.float64) param.intrinsic.set_intrinsics( 1280, 720, camera_intrin_matrix[0][0], camera_intrin_matrix[1][1], 639.5, 359.5, ) return param
[docs] def load_real_camera_intrinsic(self, scene_id): """Load camera real intrinsic matrix Args: scene_id(int): scene index. Returns: np.ndarray: camera intrinsic matrix. """ return np.load( os.path.join( self.root, 'scenes', self.load_scene_name(scene_id), 'color_camK.npy' ) )
[docs] def load_raw_image(self, scene_id, image_id, order = 'RGB'): """Load color image Args: scene_id(int): scene index. image_id(int): image index. order(str): RGB or BGR. Returns: np.ndarray: color image. """ raw_bgr = cv2.imread( os.path.join( self.root, 'scenes', self.load_scene_name(scene_id), 'rgb_undistort', '%04d.png' % image_id ) ) if order == 'BGR': return raw_bgr elif order == 'RGB': return cv2.cvtColor(raw_bgr, cv2.COLOR_BGR2RGB) else: raise ValueError('Unknown order {}, only RGB and BGR are allowed.'.format(order))
[docs] def load_depth_image(self, scene_id, image_id): """Load depth image Args: scene_id(int): scene index. image_id(int): image index. Returns: np.ndarray: depth image. """ depth = cv2.imread( os.path.join( self.root, 'scenes', self.load_scene_name(scene_id), 'depth_undistort', '%04d.png' % image_id ), cv2.IMREAD_UNCHANGED ) return depth
[docs] def load_camera_pose(self, scene_id, image_id): """Load camera poses Args: scene_id(int): scene index. image_id(int): image index. Returns: np.ndarray: camera pose with world frame. """ return np.load(os.path.join( self.root, 'scenes', self.load_scene_name(scene_id), 'camera_poses', '%04d.npy' % image_id ) )
[docs] def load_scene_object_list(self, scene_id): """Load object list in a scene Args: scene_id(int): scene index. Returns: list: object name list. """ f = open( os.path.join( self.root, 'scenes', self.load_scene_name(scene_id), 'object_list.txt' ) ) scene_object_name_list = f.readlines() f.close() for i in range(len(scene_object_name_list)): scene_object_name_list[i] = scene_object_name_list[i].strip() return scene_object_name_list
[docs] def load_object_mesh(self, model_name): """Load model mesh file Args: model_name(str): model name. Returns: o3d.geometry.TriangleMesh: model mesh. """ o3d_mesh = o3d.io.read_triangle_mesh( os.path.join( self.root, 'rgb_pcd', '{}.ply'.format(model_name) ) ) return o3d_mesh
[docs] def load_object_pose_dict(self, scene_id): """load object pose in each image Args: scene_id(int): scene index. Returns: dict: object poses. """ object_pose_dict = dict() for object_name in self.load_scene_object_list(scene_id): pose_file_path = os.path.join( self.root, 'scenes', self.load_scene_name(scene_id), 'object_poses', object_name+'.npy' ) if os.path.exists(pose_file_path): object_pose_dict[object_name] = np.load(pose_file_path) else: object_pose_dict[object_name] = None return object_pose_dict
[docs] def load_seg_mask(self, scene_id, image_id): """load segmentation mask Args: scene_id(int): scene index. image_id(int): image index. Returns: np.ndarray: segmentation mask. """ seg_mask_path = os.path.join( self.root, 'scenes', self.load_scene_name(scene_id), 'seg_masks', '%04d.npy' % image_id ) return np.load(seg_mask_path)
[docs] def load_point_cloud(self, scene_id, image_id): """Load partial view point cloud Args: scene_id(int): scene index. image_id(int): image index. Returns: o3d.geometry.PointCloud: partial view point cloud. """ pcd = generate_scene_pointcloud( depth_path = os.path.join( self.root, 'scenes', self.load_scene_name(scene_id), 'depth_undistort', '%04d.png' % image_id ), rgb_path = os.path.join( self.root, 'scenes', self.load_scene_name(scene_id), 'rgb_undistort', '%04d.png' % image_id ), intrinsics = self.load_real_camera_intrinsic(scene_id), depth_scale = 1000.0 ) return pcd
[docs] def load_scene_point_cloud(self, scene_id): """Load full view scene point cloud Args: scene_id(int): scene index. Returns: o3d.geometry.PointCloud: Reconstructed point cloud. """ pcds = [] # one in four of the scenes are used to calculate the full scene for image_id in range(0, self.load_scene_image_number(scene_id), 4): income_pcd = self.load_point_cloud(scene_id = scene_id, image_id = image_id) camera_pose = self.load_camera_pose(scene_id = scene_id, image_id = image_id) income_pcd.transform(camera_pose) pcds.append(income_pcd) full_pcd = combine(pcds) return full_pcd
[docs] def vis_6dpose(self, scene_id, image_id = None, dimension = 3, show = True): """Visualize 6d pose annotation in a scene or in an image Args: scene_id(int): the id of the scene. image_id(int or None): the id of the image, None for the whole scene. dimension(int): 2 for 2d visualization and 3 for 3d visualization. show(bool): whether to show the result using open3d. Returns: o3d.geometry.PointCloud or np.array: If dimension==2, returns point cloud; elif dimension == 3, returns BGR image. """ if dimension == 2: if image_id is not None: bgr = self.load_raw_image(scene_id, image_id, order = 'BGR') mask = self.load_seg_mask(scene_id, image_id) scene_object_list = self.load_scene_object_list(scene_id) for scene_object_name in scene_object_list: object_id = self.object_id_dict[scene_object_name] instance_mask = mask == object_id bgr[instance_mask] = bgr[instance_mask] // 2 + get_random_color() // 2 if show: plt.imshow(cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)) plt.show() return bgr else: raise ValueError('The image id must be given for 2d visualization') elif dimension == 3: geometry_list = [] if image_id is not None: image_pcd = self.load_point_cloud(scene_id, image_id) camera_pose = self.load_camera_pose(scene_id, image_id) image_pcd.transform(camera_pose) geometry_list.append(image_pcd) else: scene_pcd = self.load_scene_point_cloud(scene_id) geometry_list.append(scene_pcd) object_pose_dict = self.load_object_pose_dict(scene_id) scene_object_list = self.load_scene_object_list(scene_id) for scene_object_name in scene_object_list: if (scene_object_name in object_pose_dict.keys()) and (object_pose_dict[scene_object_name] is not None): o3d_mesh = self.load_object_mesh(scene_object_name) sixd_pose = object_pose_dict[scene_object_name] o3d_mesh.transform(sixd_pose) geometry_list.append(o3d_mesh.sample_points_uniformly(10000)) full_pcd = merge_pcds(geometry_list) if show: o3d.visualization.draw_geometries([full_pcd]) return full_pcd else: raise ValueError('Dimension must be 2 or 3')