Queued IO

One of the main draws of larcv for machine learning is the ability to use the queued data reader. This preloads the next batch of data (you can pick it, or you can let larcv randomly go through your dataset) while you are working on the current one.

Often, the details of the IO needs of an application are specific enough that it is difficult to write a sufficiently generic interface at a high level to make this into a one-line-of-code situation. So, this tutorial aims to show the highest level interface and how you can use it, hopefully providing enough of a starting point to move forward.

import numpy
import larcv

Step 0 - Creating an input file

Obviously, skip this step when you are ready with your own input file, but here’s a short step to generate an input file.

def create_tensor2d(image_meta):
    # Create an return a list of Tensor2D objects with dense shape as defined

    for i in range(n_projection_ids):
        data = numpy.random.uniform(size=dense_shape).astype("float32")
        tensor = larcv.Tensor2D(data)
        # Creating from numpy automatically sets the projection ID to 0, so fix that:
        tensor.set_projection_id(i)
        tensor_2ds.append(tensor)
    return tensor_2ds
# What should the name of the file be?
output = "queue_io_input.h5"

# Create an output larcv file, using the WRITE mode
io_manager = larcv.IOManager(larcv.IOManager.kWRITE)
io_manager.set_out_file(str(output))
io_manager.initialize()

n_events = 10

n_voxels = (64, 64, 64)
boundary_min = numpy.asarray((-50, -50, -50))
boundary_max = numpy.asarray(( 50,  50,  50))

# compute the origin:
length = (boundary_max - boundary_min)

print(length)
# For this, we'll use one pre-defined image meta
meta = larcv.ImageMeta3D()
for i_axis in [0,1,2]:
    meta.set_dimension(i_axis, length[i_axis], n_voxels[i_axis], boundary_min[i_axis])

print(meta)

step_size = 0.5

for i in range(n_events):
    # Create some random data as input.

    # We're going to create lines of data, with random start points and directions,
    # And fill in voxels along those lines.
    
    start     = boundary_min + numpy.random.uniform(size=3)*boundary_max
    direction = numpy.random.uniform(size=3) - 0.5
    print(direction)
    
    voxel_set = larcv.VoxelSet()
    
    p = start
    
    # Loop until this trajectory exits the space:
    while (p > boundary_min).all() and (p < boundary_max).all():
        
        # Use ImageMeta to figure out what voxel this goes into:
        index = meta.position_to_index(p)
        # larcv will automatically check if this voxel already exists and overwrite with "insert"
        # (Use "add" to add values instead)
        voxel_set.insert(larcv.Voxel(index, value=1.0))
        
        p = p + step_size * direction
#     # Now, store this set of voxels in the event:
    event_sparse_tensor = io_manager.get_data("sparse3d", "queue_demo")
    event_sparse_tensor.set(voxel_set, meta)
    
    # Let's also save the start position and momentum using a Particle object:
    particle = larcv.Particle()
    particle.position(*start, 0.0)
    particle.momentum(*direction)
    
    event_particle = io_manager.get_data("particle", "queue_demo")
    
    event_particle.append(particle)
    
#     # Save the data:
    io_manager.save_entry()
    
io_manager.finalize()
[100 100 100]
ProjectionID 0
  N Voxels: (64, 64, 64)
  Image Size: (100, 100, 100)
  Origin: (-50, -50, -50)
  Valid: True

[-0.36722953 -0.48778355 -0.30883679]
[ 0.12968829  0.05789533 -0.24592748]
[0.32055465 0.3936945  0.34927541]
[0.04715452 0.24217907 0.09158854]
[ 0.24085695  0.1979712  -0.20836901]
[ 0.45615887 -0.45908206  0.04478502]
[-0.32747276 -0.07803196  0.00751879]
[ 0.33289431 -0.16932123 -0.278194  ]
[ 0.09493213  0.26192306 -0.49691674]
[-0.44972895 -0.12308742 -0.06972504]
    [NORMAL]  <IOManager::finalize> Closing output file

Step 1 - Configuring

To use the Queue IO layer, we have to provide it with a proper config file. We try to make this easy with larcv.config_builder:

# Create the config builder and turn verbosity down low:
from larcv.config_builder import ConfigBuilder
cb = ConfigBuilder()
cb.set_parameter(["queue_io_input.h5"], "InputFiles") # Pass input files as a list because you can combine them here
cb.set_parameter(5, "ProcessDriver", "IOManager", "Verbosity")
cb.set_parameter(5, "ProcessDriver", "Verbosity")
cb.set_parameter(5, "Verbosity")

Config Builder works by adding batch_fillers and preprocess apps:

name = "demo"

# Bring in the wires:
cb.add_batch_filler(
    datatype  = "sparse3d",
    producer  = "queue_demo",
    name      = name+"data", # This is the name in the output dict to place this data
    MaxVoxels = 1000, # Zero pad the empty voxels for this many voxels
    Augment   = False, # Apply on-the-fly augmentation
    Channels  = [0,] # How many projection IDs?
)

# Bring in the labels:
cb.add_batch_filler(
    datatype  = "particle",
    producer  = "queue_demo",
    name      = name+"label",
)


# Build up the data_keys:
data_keys = {
    'image': name + 'data',
    'label': name + 'label'
    }

import json
print(json.dumps(cb.get_config(), indent=2))
{
  "InputFiles": [
    "queue_io_input.h5"
  ],
  "ProcessDriver": {
    "EnableFilter": false,
    "IOManager": {
      "IOMode": 0,
      "Input": {
        "InputFiles": [],
        "ReadOnlyName": [],
        "ReadOnlyType": [],
        "UseH5CoreDriver": false
      },
      "Output": {
        "Compression": 1,
        "OutFileName": "",
        "StoreOnlyName": [],
        "StoreOnlyType": []
      },
      "Verbosity": 5
    },
    "NumEntries": 0,
    "ProcessList": {
      "demodata": {
        "Augment": false,
        "Channels": [
          0
        ],
        "IncludeValues": true,
        "MaxVoxels": 1000,
        "Producer": "queue_demo",
        "UnfilledVoxelValue": -999.0
      },
      "demolabel": {
        "Producer": "queue_demo"
      }
    },
    "ProcessName": [
      "demodata",
      "demolabel"
    ],
    "ProcessType": [
      "BatchFillerSparseTensor3D",
      "BatchFillerParticle"
    ],
    "RandomAccess": false,
    "RandomSeed": 0,
    "StartEntry": 0,
    "Verbosity": 5
  },
  "ProcessName": 0,
  "Verbosity": 5
}

A lot of this configuration gets filled in for you, automatically, because larcv has defaults for every setting.

Next, we create a queue loader:

from larcv.queueloader import queue_interface
# from larcv.distributed_queue_interface import queue_interface # This is the multi-node, parallel version!

queue_interface = queue_interface(random_access_mode="random_blocks", seed=1234)
queue_interface.no_warnings()

# This gets the queue interface started:
batch_size=4


# Prepare data managers: (You can have more than one!  Train / test / anything)
io_config = {
    'filler_name' : name,
    'filler_cfg'  : cb.get_config(),
    'verbosity'   : 5,
    'make_copy'   : False # This is if you want to make a copy of data in python
}


queue_interface.prepare_manager(name, io_config, batch_size, data_keys, color=None)
   [WARNING]  <demodata::configure> BatchFillerSparseTensor3D is only supported with one channel!

Now that everything is configured, you can load data with the queue interface:

data_dict = queue_interface.fetch_minibatch_data(name, pop=True,fetch_meta_data=True)
print(data_dict.keys())
dict_keys(['image', 'label', 'entries', 'event_ids'])
# Print the vertex locations of this data:
print(data_dict['label']["_vtx"])
[[( -4.97279092,  -9.99608904, -13.55535244, 0.)]
 [(-32.51733221, -14.16470064, -38.04297486, 0.)]
 [(-14.84052472, -38.76055026, -46.04056107, 0.)]
 [(-44.37679479, -45.36842982, -41.02790002, 0.)]]

If you call prepare_next before your data-fetching step ends, you will launch a C++ thread in the background to read the next batch of data while you’re using this one:

queue_interface.prepare_next(name) # prepare more data for the dataset tagged with `name`
# If you call with `pop` = False, you get back the same data:
re_data_dict = queue_interface.fetch_minibatch_data(name, pop=False,fetch_meta_data=True)
print(re_data_dict['label']["_vtx"])
[[( -4.97279092,  -9.99608904, -13.55535244, 0.)]
 [(-32.51733221, -14.16470064, -38.04297486, 0.)]
 [(-14.84052472, -38.76055026, -46.04056107, 0.)]
 [(-44.37679479, -45.36842982, -41.02790002, 0.)]]
# Calling with pop = True will discard this data after you use it - so you get it one more time,
# but the next time is different.  In general, you usually want pop = True.
re_data_dict = queue_interface.fetch_minibatch_data(name, pop=True,fetch_meta_data=True)
queue_interface.prepare_next(name)
print(re_data_dict['label']["_vtx"])
[[( -4.97279092,  -9.99608904, -13.55535244, 0.)]
 [(-32.51733221, -14.16470064, -38.04297486, 0.)]
 [(-14.84052472, -38.76055026, -46.04056107, 0.)]
 [(-44.37679479, -45.36842982, -41.02790002, 0.)]]
re_data_dict = queue_interface.fetch_minibatch_data(name, pop=True,fetch_meta_data=True)
queue_interface.prepare_next(name)
print(re_data_dict['label']["_vtx"])
[[(-29.01198198, -47.82635937, -40.97786919, 0.)]
 [( -5.82826238, -15.16454682,  -2.67046139, 0.)]
 [( -2.27657811, -14.32547878, -13.37233451, 0.)]
 [(-17.61627359, -41.59867992, -15.51601294, 0.)]]

Summary

The queue interface provides a way to streamline data readback, letting you write a few lines of python to build a config, start a loader, and then you can get your dataset in one line per iteration. More - and better - documentation is a work in progress and will be coming soon!