Spaces:
Sleeping
Sleeping
| from os.path import join as pjoin | |
| from torch.utils import data | |
| import numpy as np | |
| import torch | |
| from tqdm import tqdm | |
| from torch.utils.data._utils.collate import default_collate | |
| import random | |
| import codecs as cs | |
| from utils.glove import GloVe | |
| from human_body_prior.body_model.body_model import BodyModel | |
| ################################################################################# | |
| # Collate Function # | |
| ################################################################################# | |
| def collate_fn(batch): | |
| batch.sort(key=lambda x: x[3], reverse=True) | |
| return default_collate(batch) | |
| ################################################################################# | |
| # Datasets # | |
| ################################################################################# | |
| class AEDataset(data.Dataset): | |
| def __init__(self, mean, std, motion_dir, window_size, split_file): | |
| self.data = [] | |
| self.lengths = [] | |
| id_list = [] | |
| with open(split_file, 'r') as f: | |
| for line in f.readlines(): | |
| id_list.append(line.strip()) | |
| for name in tqdm(id_list): | |
| try: | |
| motion = np.load(pjoin(motion_dir, name + '.npy')) | |
| if len(motion.shape) == 2: # B,L,J,3 | |
| motion = np.expand_dims(motion, axis=0) | |
| if motion.shape[0] < window_size: | |
| continue | |
| self.lengths.append(motion.shape[0] - window_size) | |
| self.data.append(motion) | |
| except Exception as e: | |
| pass | |
| self.cumsum = np.cumsum([0] + self.lengths) | |
| self.window_size = window_size | |
| self.mean = mean | |
| self.std = std | |
| print("Total number of motions {}, snippets {}".format(len(self.data), self.cumsum[-1])) | |
| def __len__(self): | |
| return self.cumsum[-1] | |
| def __getitem__(self, item): | |
| if item != 0: | |
| motion_id = np.searchsorted(self.cumsum, item) - 1 | |
| idx = item - self.cumsum[motion_id] - 1 | |
| else: | |
| motion_id = 0 | |
| idx = 0 | |
| motion = self.data[motion_id][idx:idx + self.window_size] | |
| "Z Normalization" | |
| motion = (motion - self.mean) / self.std | |
| return motion | |
| class AEMeshDataset(data.Dataset): | |
| def __init__(self, mean, std, motion_dir, window_size, split_file): | |
| self.data = [] | |
| self.lengths = [] | |
| id_list = [] | |
| with open(split_file, 'r') as f: | |
| for line in f.readlines(): | |
| id_list.append(line.strip()) | |
| for name in id_list: | |
| try: | |
| motion = np.load(pjoin(motion_dir, name + '.npy')) | |
| if motion.shape[0] < window_size: | |
| continue | |
| self.lengths.append(motion.shape[0] - motion.shape[0]+1) | |
| self.data.append(motion) | |
| except Exception as e: | |
| pass | |
| self.cumsum = np.cumsum([0] + self.lengths) | |
| self.window_size = window_size | |
| self.mean = mean | |
| self.std = std | |
| num_betas = 10 | |
| num_dmpls = 8 | |
| self.bm = BodyModel(bm_fname='./body_models/smplh/neutral/model.npz', num_betas=num_betas, num_dmpls=num_dmpls, dmpl_fname='./body_models/dmpls/neutral/model.npz') | |
| print("Total number of motions {}, snippets {}".format(len(self.data), self.cumsum[-1])) | |
| def __len__(self): | |
| return self.cumsum[-1] | |
| def __getitem__(self, item): | |
| motion = self.data[item] | |
| body_parms = { | |
| 'root_orient': torch.from_numpy(motion[:, :3]).float(), | |
| 'pose_body': torch.from_numpy(motion[:, 3:66]).float(), | |
| 'pose_hand': torch.from_numpy(motion[:, 66:52*3]).float(), | |
| 'trans': torch.from_numpy(motion[:, 52*3:53*3]).float(), | |
| 'betas': torch.from_numpy(motion[:, 53*3:53*3+10]).float(), | |
| 'dmpls': torch.from_numpy(motion[:, 53*3+10:]).float() | |
| } | |
| body_parms['betas']= torch.zeros_like(torch.from_numpy(motion[:, 53*3:53*3+10]).float()) | |
| with torch.no_grad(): | |
| verts = self.bm(**body_parms).v | |
| verts[:, :, 1] -= verts[:, :, 1].min() | |
| idx = random.randint(0, verts.shape[0] - 1) | |
| verts = verts[idx:idx + self.window_size].detach().cpu().numpy() | |
| "Z Normalization" | |
| verts = (verts - self.mean) / self.std | |
| return verts | |
| class Text2MotionDataset(data.Dataset): | |
| def __init__(self, mean, std, split_file, dataset_name, motion_dir, text_dir, unit_length, max_motion_length, | |
| max_text_length, evaluation=False, is_mesh=False): | |
| self.evaluation = evaluation | |
| self.max_length = 20 | |
| self.pointer = 0 | |
| self.max_motion_length = max_motion_length | |
| self.max_text_len = max_text_length | |
| self.unit_length = unit_length | |
| min_motion_len = 40 if dataset_name =='t2m' else 24 | |
| data_dict = {} | |
| id_list = [] | |
| with cs.open(split_file, 'r') as f: | |
| for line in f.readlines(): | |
| id_list.append(line.strip()) | |
| new_name_list = [] | |
| length_list = [] | |
| for name in tqdm(id_list): | |
| try: | |
| motion = np.load(pjoin(motion_dir, name + '.npy')) | |
| if len(motion.shape) == 2: | |
| motion = np.expand_dims(motion, axis=0) | |
| if is_mesh: | |
| if (len(motion)) < min_motion_len: | |
| continue | |
| else: | |
| if (len(motion)) < min_motion_len or (len(motion) >= 200): | |
| continue | |
| text_data = [] | |
| flag = False | |
| with cs.open(pjoin(text_dir, name + '.txt')) as f: | |
| for line in f.readlines(): | |
| text_dict = {} | |
| line_split = line.strip().split('#') | |
| caption = line_split[0] | |
| tokens = line_split[1].split(' ') | |
| f_tag = float(line_split[2]) | |
| to_tag = float(line_split[3]) | |
| f_tag = 0.0 if np.isnan(f_tag) else f_tag | |
| to_tag = 0.0 if np.isnan(to_tag) else to_tag | |
| text_dict['caption'] = caption | |
| text_dict['tokens'] = tokens | |
| if f_tag == 0.0 and to_tag == 0.0: | |
| flag = True | |
| text_data.append(text_dict) | |
| else: | |
| try: | |
| n_motion = motion[int(f_tag*20) : int(to_tag*20)] | |
| if (len(n_motion)) < min_motion_len or (len(n_motion) >= 200): | |
| continue | |
| new_name = random.choice('ABCDEFGHIJKLMNOPQRSTUVW') + '_' + name | |
| while new_name in data_dict: | |
| new_name = random.choice('ABCDEFGHIJKLMNOPQRSTUVW') + '_' + name | |
| data_dict[new_name] = {'motion': n_motion, | |
| 'length': len(n_motion), | |
| 'text':[text_dict]} | |
| new_name_list.append(new_name) | |
| length_list.append(len(n_motion)) | |
| except: | |
| print(line_split) | |
| print(line_split[2], line_split[3], f_tag, to_tag, name) | |
| if flag: | |
| data_dict[name] = {'motion': motion, | |
| 'length': len(motion), | |
| 'text': text_data} | |
| new_name_list.append(name) | |
| length_list.append(len(motion)) | |
| except: | |
| pass | |
| if self.evaluation: | |
| self.w_vectorizer = GloVe('./glove', 'our_vab') | |
| name_list, length_list = zip(*sorted(zip(new_name_list, length_list), key=lambda x: x[1])) | |
| else: | |
| name_list, length_list = new_name_list, length_list | |
| self.mean = mean | |
| self.std = std | |
| self.length_arr = np.array(length_list) | |
| self.data_dict = data_dict | |
| self.name_list = name_list | |
| if self.evaluation: | |
| self.reset_max_len(self.max_length) | |
| def reset_max_len(self, length): | |
| assert length <= self.max_motion_length | |
| self.pointer = np.searchsorted(self.length_arr, length) | |
| print("Pointer Pointing at %d"%self.pointer) | |
| self.max_length = length | |
| def transform(self, data, mean=None, std=None): | |
| if mean is None and std is None: | |
| return (data - self.mean) / self.std | |
| else: | |
| return (data - mean) / std | |
| def inv_transform(self, data, mean=None, std=None): | |
| if mean is None and std is None: | |
| return data * self.std + self.mean | |
| else: | |
| return data * std + mean | |
| def __len__(self): | |
| return len(self.data_dict) - self.pointer | |
| def __getitem__(self, item): | |
| idx = self.pointer + item | |
| data = self.data_dict[self.name_list[idx]] | |
| motion, m_length, text_list = data['motion'], data['length'], data['text'] | |
| # Randomly select a caption | |
| text_data = random.choice(text_list) | |
| caption, tokens = text_data['caption'], text_data['tokens'] | |
| if self.evaluation: | |
| if len(tokens) < self.max_text_len: | |
| # pad with "unk" | |
| tokens = ['sos/OTHER'] + tokens + ['eos/OTHER'] | |
| sent_len = len(tokens) | |
| tokens = tokens + ['unk/OTHER'] * (self.max_text_len + 2 - sent_len) | |
| else: | |
| # crop | |
| tokens = tokens[:self.max_text_len] | |
| tokens = ['sos/OTHER'] + tokens + ['eos/OTHER'] | |
| sent_len = len(tokens) | |
| pos_one_hots = [] | |
| word_embeddings = [] | |
| for token in tokens: | |
| word_emb, pos_oh = self.w_vectorizer[token] | |
| pos_one_hots.append(pos_oh[None, :]) | |
| word_embeddings.append(word_emb[None, :]) | |
| pos_one_hots = np.concatenate(pos_one_hots, axis=0) | |
| word_embeddings = np.concatenate(word_embeddings, axis=0) | |
| if self.unit_length < 10: | |
| coin2 = np.random.choice(['single', 'single', 'double']) | |
| else: | |
| coin2 = 'single' | |
| if coin2 == 'double': | |
| m_length = (m_length // self.unit_length - 1) * self.unit_length | |
| elif coin2 == 'single': | |
| m_length = (m_length // self.unit_length) * self.unit_length | |
| idx = random.randint(0, len(motion) - m_length) | |
| motion = motion[idx:idx+m_length] | |
| "Z Normalization" | |
| motion = (motion - self.mean) / self.std | |
| if m_length < self.max_motion_length: | |
| motion = np.concatenate([motion, | |
| np.zeros((self.max_motion_length - m_length, motion.shape[1], motion.shape[2])) | |
| ], axis=0) | |
| elif m_length > self.max_motion_length: | |
| idx = random.randint(0, m_length - self.max_motion_length) | |
| motion = motion[idx:idx + self.max_motion_length] | |
| if self.evaluation: | |
| return word_embeddings, pos_one_hots, caption, sent_len, motion, m_length, '_'.join(tokens) | |
| else: | |
| return caption, motion, m_length | |
| class Text2MotionDataset_Another_V(data.Dataset): | |
| def __init__(self, mean, std, split_file, dataset_name, motion_dir, text_dir, unit_length, max_motion_length, | |
| max_text_length, evaluation=False, is_mesh=False): | |
| self.evaluation = evaluation | |
| self.max_length = 20 | |
| self.pointer = 0 | |
| self.max_motion_length = max_motion_length | |
| self.max_text_len = max_text_length | |
| self.unit_length = unit_length | |
| min_motion_len = 40 if dataset_name =='t2m' else 24 | |
| data_dict = {} | |
| id_list = [] | |
| with cs.open(split_file, 'r') as f: | |
| for line in f.readlines(): | |
| id_list.append(line.strip()) | |
| new_name_list = [] | |
| length_list = [] | |
| for name in tqdm(id_list): | |
| try: | |
| motion = np.load(pjoin(motion_dir, name + '.npy')) | |
| if not self.evaluation: | |
| if len(motion.shape) == 2: | |
| motion = np.expand_dims(motion, axis=0) | |
| if is_mesh: | |
| if (len(motion)) < min_motion_len: | |
| continue | |
| else: | |
| if (len(motion)) < min_motion_len or (len(motion) >= 200): | |
| continue | |
| text_data = [] | |
| flag = False | |
| with cs.open(pjoin(text_dir, name + '.txt')) as f: | |
| for line in f.readlines(): | |
| text_dict = {} | |
| line_split = line.strip().split('#') | |
| caption = line_split[0] | |
| tokens = line_split[1].split(' ') | |
| f_tag = float(line_split[2]) | |
| to_tag = float(line_split[3]) | |
| f_tag = 0.0 if np.isnan(f_tag) else f_tag | |
| to_tag = 0.0 if np.isnan(to_tag) else to_tag | |
| text_dict['caption'] = caption | |
| text_dict['tokens'] = tokens | |
| if f_tag == 0.0 and to_tag == 0.0: | |
| flag = True | |
| text_data.append(text_dict) | |
| else: | |
| try: | |
| n_motion = motion[int(f_tag*20) : int(to_tag*20)] | |
| if (len(n_motion)) < min_motion_len or (len(n_motion) >= 200): | |
| continue | |
| new_name = random.choice('ABCDEFGHIJKLMNOPQRSTUVW') + '_' + name | |
| while new_name in data_dict: | |
| new_name = random.choice('ABCDEFGHIJKLMNOPQRSTUVW') + '_' + name | |
| data_dict[new_name] = {'motion': n_motion, | |
| 'length': len(n_motion), | |
| 'text':[text_dict]} | |
| new_name_list.append(new_name) | |
| length_list.append(len(n_motion)) | |
| except: | |
| print(line_split) | |
| print(line_split[2], line_split[3], f_tag, to_tag, name) | |
| if flag: | |
| data_dict[name] = {'motion': motion, | |
| 'length': len(motion), | |
| 'text': text_data} | |
| new_name_list.append(name) | |
| length_list.append(len(motion)) | |
| except: | |
| pass | |
| if self.evaluation: | |
| self.w_vectorizer = GloVe('./glove', 'our_vab') | |
| name_list, length_list = zip(*sorted(zip(new_name_list, length_list), key=lambda x: x[1])) | |
| else: | |
| name_list, length_list = new_name_list, length_list | |
| self.mean = mean | |
| self.std = std | |
| self.length_arr = np.array(length_list) | |
| self.data_dict = data_dict | |
| self.name_list = name_list | |
| if self.evaluation: | |
| self.reset_max_len(self.max_length) | |
| def reset_max_len(self, length): | |
| assert length <= self.max_motion_length | |
| self.pointer = np.searchsorted(self.length_arr, length) | |
| print("Pointer Pointing at %d"%self.pointer) | |
| self.max_length = length | |
| def transform(self, data, mean=None, std=None): | |
| if mean is None and std is None: | |
| return (data - self.mean) / self.std | |
| else: | |
| return (data - mean) / std | |
| def inv_transform(self, data, mean=None, std=None): | |
| if mean is None and std is None: | |
| return data * self.std + self.mean | |
| else: | |
| return data * std + mean | |
| def __len__(self): | |
| return len(self.data_dict) - self.pointer | |
| def __getitem__(self, item): | |
| idx = self.pointer + item | |
| data = self.data_dict[self.name_list[idx]] | |
| motion, m_length, text_list = data['motion'], data['length'], data['text'] | |
| # Randomly select a caption | |
| text_data = random.choice(text_list) | |
| caption, tokens = text_data['caption'], text_data['tokens'] | |
| if self.evaluation: | |
| if len(tokens) < self.max_text_len: | |
| # pad with "unk" | |
| tokens = ['sos/OTHER'] + tokens + ['eos/OTHER'] | |
| sent_len = len(tokens) | |
| tokens = tokens + ['unk/OTHER'] * (self.max_text_len + 2 - sent_len) | |
| else: | |
| # crop | |
| tokens = tokens[:self.max_text_len] | |
| tokens = ['sos/OTHER'] + tokens + ['eos/OTHER'] | |
| sent_len = len(tokens) | |
| pos_one_hots = [] | |
| word_embeddings = [] | |
| for token in tokens: | |
| word_emb, pos_oh = self.w_vectorizer[token] | |
| pos_one_hots.append(pos_oh[None, :]) | |
| word_embeddings.append(word_emb[None, :]) | |
| pos_one_hots = np.concatenate(pos_one_hots, axis=0) | |
| word_embeddings = np.concatenate(word_embeddings, axis=0) | |
| if self.unit_length < 10: | |
| coin2 = np.random.choice(['single', 'single', 'double']) | |
| else: | |
| coin2 = 'single' | |
| if coin2 == 'double': | |
| m_length = (m_length // self.unit_length - 1) * self.unit_length | |
| elif coin2 == 'single': | |
| m_length = (m_length // self.unit_length) * self.unit_length | |
| idx = random.randint(0, len(motion) - m_length) | |
| motion = motion[idx:idx+m_length] | |
| "Z Normalization" | |
| if self.evaluation: | |
| motion = motion[:, :self.mean.shape[0]] | |
| motion = (motion - self.mean) / self.std | |
| if m_length < self.max_motion_length: | |
| if self.evaluation: | |
| motion = np.concatenate([motion, | |
| np.zeros((self.max_motion_length - m_length, motion.shape[1])) | |
| ], axis=0) | |
| else: | |
| motion = np.concatenate([motion, | |
| np.zeros((self.max_motion_length - m_length, motion.shape[1], motion.shape[2])) | |
| ], axis=0) | |
| elif m_length > self.max_motion_length: | |
| if not self.evaluation: | |
| idx = random.randint(0, m_length - self.max_motion_length) | |
| motion = motion[idx:idx + self.max_motion_length] | |
| if self.evaluation: | |
| return word_embeddings, pos_one_hots, caption, sent_len, motion, m_length, '_'.join(tokens) | |
| else: | |
| return caption, motion, m_length |