File size: 2,633 Bytes
6073e55
23fdbc0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# Copyright (c) 2025 Ye Liu. Licensed under the BSD-3-Clause License.

import csv

import nncore

from videomind.dataset.hybrid import DATASETS
from videomind.dataset.wrappers import AnsweringCropDataset, AnsweringDataset, GroundingDataset
from videomind.utils.parser import parse_query, parse_question


@DATASETS.register(name='nextgqa')
class NExTGQADataset(AnsweringDataset):

    ANNO_PATH_VALID = 'data/nextgqa/val.csv'
    ANNO_PATH_TEST = 'data/nextgqa/test.csv'

    SPAN_PATH_VALID = 'data/nextgqa/gsub_val.json'
    SPAN_PATH_TEST = 'data/nextgqa/gsub_test.json'

    VIDEO_ID_MAP = 'data/nextgqa/map_vid_vidorID.json'
    VIDEO_ROOT = 'data/nextqa/videos'

    SOURCE = 'nextgqa'
    DATA_TYPE = 'multimodal'

    UNIT = 0.1

    @classmethod
    def load_annos(self, split='valid'):
        assert split in ('valid', 'test')

        if split == 'valid':
            anno_path = self.ANNO_PATH_VALID
            raw_spans = nncore.load(self.SPAN_PATH_VALID)
        else:
            anno_path = self.ANNO_PATH_TEST
            raw_spans = nncore.load(self.SPAN_PATH_TEST)

        with open(anno_path, mode='r') as f:
            reader = csv.DictReader(f)
            raw_annos = [d for d in reader]

        video_id_map = nncore.load(self.VIDEO_ID_MAP)

        annos = []
        for raw_anno in raw_annos:
            vid = raw_anno['video_id']
            qid = raw_anno['qid']

            video_id = video_id_map[vid]

            query = parse_query(raw_anno['question'].capitalize() + '?')
            question = parse_question(raw_anno['question'].capitalize() + '?')
            options = [raw_anno[k].capitalize() for k in ('a0', 'a1', 'a2', 'a3', 'a4')]
            answer = raw_anno['answer'].capitalize()
            ans = chr(ord('A') + options.index(answer))

            anno = dict(
                source=self.SOURCE,
                data_type=self.DATA_TYPE,
                video_path=nncore.join(self.VIDEO_ROOT, video_id + '.mp4'),
                duration=raw_spans[vid]['duration'],
                query=query,
                question=question,
                options=options,
                answer=answer,
                ans=ans,
                span=raw_spans[vid]['location'][qid],
                task=raw_anno['type'])

            annos.append(anno)

        return annos


@DATASETS.register(name='nextgqa_crop')
class NExTGQACropDataset(AnsweringCropDataset, NExTGQADataset):

    SOURCE = 'nextgqa_crop'


@DATASETS.register(name='nextgqa_grounding')
class NExTGQAGroundingDataset(GroundingDataset, NExTGQADataset):

    SOURCE = 'nextgqa_grounding'
    DATA_TYPE = 'grounding'