https://kedro.org/ logo
Join the conversationJoin Discord
Channels
advanced-need-help
announcements
beginners-need-help
introductions
job-posting
plugins-integrations
random
resources
welcome
Powered by Linen
advanced-need-help
  • u

    user

    03/11/2022, 7:38 AM
    How to fetch complex MongoDB Data from Kedro? https://stackoverflow.com/questions/71435031/how-to-fetch-complex-mongodb-data-from-kedro
  • w

    Walber Moreira

    03/11/2022, 2:30 PM
    Be careful with this workaround because it may run in perfomance issues when there is a lot of data.
  • d

    datajoely

    03/11/2022, 2:33 PM
    Good point
  • w

    Walber Moreira

    03/11/2022, 2:34 PM
    Guys, I have a question: which is the proper runner to use on databricks? I’ve been testing the run time using the 3 runners but I’m still not comfortable enough to make a decision which one is better. I’ve searched the internet but haven’t found a direct topic on this 😦
  • d

    datajoely

    03/11/2022, 2:35 PM
    ThreadRunner
    is for spark and other remote execution worklaods
  • w

    Walber Moreira

    03/11/2022, 2:40 PM
    Thx! My tests results indicated a 40% reduction on the run time but running on our main and larger cluster (all team user) , but when running in a isolated job cluster there wasn’t any notable difference between sequential vs thread. So I kinda got confused
  • w

    Walber Moreira

    03/11/2022, 2:40 PM
    But thx Joel!
  • d

    datajoely

    03/11/2022, 2:44 PM
    So also remember that spark is doing its own distribution optimisation in the background - we're telling it which chunks it can run once isolated branches have completed, but it will be also doing it's own resource management behind the scenes. Another angle is to maybe look at optimising the spark cluster or digging into the query plans of some of the long running processes.
  • d

    Deep

    03/11/2022, 2:46 PM
    Got it
  • d

    datajoely

    03/11/2022, 2:47 PM
    Also it just occurred to me transcoding may be useful for your situation https://kedro.readthedocs.io/en/stable/05_data/01_data_catalog.html#transcoding-datasets
  • w

    Walber Moreira

    03/11/2022, 3:44 PM
    Yep! We will be trying on dev this week so we can gather more cost data to make a decision. Thx for the inputs (as always)
  • b

    boazmohar

    03/11/2022, 4:07 PM
    Hi, I just started to use Kedro and my use case is the analysis of image data (no ML for now) using dask. I have defined a custom data type that reads my data and metadata. I know how to pass the data to a node in my pipeline, but what about metadata? Should I pass the custom data type class as input?
  • d

    datajoely

    03/11/2022, 4:15 PM
    Hi @User I think you're looking for Paramters! https://kedro.readthedocs.io/en/latest/04_kedro_project_setup/02_configuration.html#parameters
  • b

    boazmohar

    03/11/2022, 4:21 PM
    That might work, I just don't see the flow yet... I have a xml file in the folder where the data is which I read using my custom data class. So it is living as a property of the class. Is that not the intended use case? I was going to use https://kedro.readthedocs.io/en/latest/06_nodes_and_pipelines/03_modular_pipelines.html, Does this make any sense? prep_pipeline = pipeline(pipe=cook_pipeline, inputs={"food": "grilled_veg", "param1": my_classname.xmldata})
  • d

    datajoely

    03/11/2022, 4:23 PM
    Could you do this in the Custom class?
    yaml
    my_data:
       type: MyCustomImageClass
       data_location: path/to/files
       metadata_location: path/to/sepc.xml
  • d

    datajoely

    03/11/2022, 4:23 PM
    or does the
    metadata_location
    need to be dynamic?
  • d

    datajoely

    03/11/2022, 4:25 PM
    Typically the nodes shouldn't be aware of IO config because we want things to be reproducible, so I'm trying to push you towards being super declarative in the catalog
  • d

    datajoely

    03/11/2022, 4:26 PM
    There are ways of crossing that boundary via Hooks, which may be useful here
  • b

    boazmohar

    03/11/2022, 4:28 PM
    No it is fixed in regards to
    data_location
    so these is not even a need to a xml path... Here is part of the class
    python
    class DaskAlpha3TifsDataset(AbstractDataSet):
        def __init__(self, filepath: str, params: Dict[str, Any] = None):
        
            # parse the path and protocol (e.g. file, http, s3, etc.)
            protocol, path = get_protocol_and_path(filepath)
            self._protocol = protocol
            self._filepath = PurePosixPath(path)
            self._fs = fsspec.filesystem(self._protocol)
            load_path = get_filepath_str(self._filepath, self._protocol)
            self.xml = get_meta_alpha3(load_path)
            self.xml['filters'] = params
            self.xml['ch_names'] = [params[i] for i in self.xml['filter_order']]
         
    
        def _load(self) -> da.Array:
            """Loads data from the image file.
    
            Returns:
                Data from the image file as a numpy array
            """
            # using get_filepath_str ensures that the protocol and path are appended correctly for different filesystems
            file_shapes = self.xml['file_shapes']
            base_dir = self.xml['base_dir'] 
            files = glob.glob(os.path.join(base_dir, 'Raw', '*.tiff'))
            logger.info(f'Found {len(files)} files in {base_dir} Raw folder')
            sizes = [(file_shapes[3], file_shapes[4])] * len(files)
            delay = [dask.delayed(load_tiff_stack)(fn) for fn in files]
            both = list(zip(delay, sizes))
            slices = [slice(i, i+file_shapes[2]) for i in range(0, len(both), file_shapes[2])]
            lazy_arrays = [da.from_delayed(x, shape=y, dtype=np.uint16) for x, y in both]
            lazy_arrays_conZ = [ da.stack(lazy_arrays[s], axis=0) for s in slices]
            lazy_arrays_conTileCh = da.stack(lazy_arrays_conZ, axis=0).reshape(file_shapes[[5,0,1,2,3,4]])
            return lazy_arrays_conTileCh
  • b

    boazmohar

    03/11/2022, 4:29 PM
    get_meta_alpha3
    knows how to find the xml based on the path
  • d

    datajoely

    03/11/2022, 4:29 PM
    so that looks okay - is there a problem with it? or are you just asking if this is kedrific or not?
  • b

    boazmohar

    03/11/2022, 4:29 PM
    I think I am fine with this, I am asking how to feed this into a
    node
  • d

    datajoely

    03/11/2022, 4:30 PM
    the
    params
    however would be in the catalog definition NOT the node
  • b

    boazmohar

    03/11/2022, 4:30 PM
    yes, this is my catalog entery:
    yml
    gel3_round1:
      type: alpha3_expand.extra.datasets.dask_alpha3_tifs_dataset.DaskAlpha3TifsDataset
      filepath: /nrs/svoboda/moharb/ExM/Alpha3/20220310_YFP_ANM1_Gel3_R1_v2/Basal/
      params:
        2: YFP
        3: PSD95
        4: GluA1
  • d

    datajoely

    03/11/2022, 4:30 PM
    yaml
    my_data:
       type: DaskAlpha3TifsDataset
       filepath: path/to/files
       params: 
         xml: path/to/sepc.xml
  • b

    boazmohar

    03/11/2022, 4:31 PM
    I can move it. but regurding what a node would get is the loaded data
    DaskArray
    but how do I access
    self.xml
  • b

    boazmohar

    03/11/2022, 4:35 PM
    Maybe my data type returned from
    DaskAlpha3TifsDataset._load
    could be a dict with the raw data and metadata?
  • b

    boazmohar

    03/11/2022, 4:39 PM
    python
     def _load(self) -> Dict[str, Any]:
            """Loads data from the image file.
    
            Returns:
                Data from the image file as a numpy array
            """
            # using get_filepath_str ensures that the protocol and path are appended correctly for different filesystems
            file_shapes = self.xml['file_shapes']
            base_dir = self.xml['base_dir'] 
            files = glob.glob(os.path.join(base_dir, 'Raw', '*.tiff'))
            logger.info(f'Found {len(files)} files in {base_dir} Raw folder')
            sizes = [(file_shapes[3], file_shapes[4])] * len(files)
            delay = [dask.delayed(load_tiff_stack)(fn) for fn in files]
            both = list(zip(delay, sizes))
            slices = [slice(i, i+file_shapes[2]) for i in range(0, len(both), file_shapes[2])]
            lazy_arrays = [da.from_delayed(x, shape=y, dtype=np.uint16) for x, y in both]
            lazy_arrays_conZ = [ da.stack(lazy_arrays[s], axis=0) for s in slices]
            lazy_arrays_conTileCh = da.stack(lazy_arrays_conZ, axis=0).reshape(file_shapes[[5,0,1,2,3,4]])
            return {'data': lazy_arrays_conTileCh, 'meta':self.xml}
  • d

    datajoely

    03/11/2022, 6:39 PM
    Sorry I had to go offline @boazmohar I can help you think through this over the weekend (or better next Monday) I'm pretty sure you just want to add another constructor arg to your dataset and then access that via self.
  • b

    boazmohar

    03/11/2022, 8:36 PM
    No worries, I appreciate your time and effort on here and on the repo very much!
Powered by Linen
Title
b

boazmohar

03/11/2022, 8:36 PM
No worries, I appreciate your time and effort on here and on the repo very much!
View count: 1