import fsspec
import numpy as np
import ujson
from kerchunk.utils import refs_as_store
decoders = {}
def reg(name):
def f(func):
decoders[name] = func
return func
return f
[docs]
class HDF4ToZarr:
"""Experimental: interface to HDF4 archival files"""
[docs]
def __init__(
self,
path,
storage_options=None,
inline_threshold=100,
out=None,
):
self.path = path
self.st = storage_options
self.thresh = inline_threshold
self.out = out or {}
def read_int(self, n):
return int.from_bytes(self.f.read(n), "big")
def read_ddh(self):
return {"ndd": self.read_int(2), "next": self.read_int(4)}
def read_dd(self):
loc = self.f.tell()
i = int.from_bytes(self.f.read(2), "big")
if i & 0x4000:
extended = True
i = i - 0x4000
else:
extended = False
tag = tags.get(i, i)
no_data = tag not in {"NULL"}
ref = (tag, int.from_bytes(self.f.read(2), "big"))
info = {
"offset": int.from_bytes(self.f.read(4), "big") * no_data,
"length": int.from_bytes(self.f.read(4), "big") * no_data,
"extended": extended,
"loc": loc,
}
return ref, info
def decode(self, tag, info):
self.f.seek(info["offset"])
ident = lambda _, __: info
return decoders.get(tag, ident)(self, info)
[docs]
def translate(self, filename=None, storage_options=None):
"""Scan and return references
Parameters
----------
filename: if given, write to this as JSON
storage_options: to interpret filename
Returns
-------
references
"""
import zarr
from kerchunk.codecs import ZlibCodec
fo = fsspec.open(self.path, **(self.st or {}))
self.f = fo.open()
# magic header
assert self.f.read(4) == b"\x0e\x03\x13\x01"
# all the data descriptors in a linked list
self.tags = {}
while True:
ddh = self.read_ddh()
for _ in range(ddh["ndd"]):
ident, info = self.read_dd()
self.tags[ident] = info
if ddh["next"] == 0:
# "finished" sentry
break
# or continue
self.f.seek(ddh["next"])
# basic decode
for tag, ref in self.tags:
self._dec(tag, ref)
# global attributes
attrs = {}
for (tag, ref), info in self.tags.items():
if tag == "VH" and info["names"][0].upper() == "VALUES":
# dtype = dtypes[info["types"][0]]
inf2 = self.tags[("VS", ref)]
self.f.seek(inf2["offset"])
# remove zero padding
data = self.f.read(inf2["length"]).split(b"\x00", 1)[0]
# NASA conventions
if info["name"].startswith(
("CoreMetadata.", "ArchiveMetadata.", "StructMetadata.")
):
obj = None
for line in data.decode().split("\n"):
if "OBJECT" in line:
obj = line.split()[-1]
if "VALUE" in line:
attrs[obj] = line.split()[-1].lstrip('"').rstrip('"')
# there should be only one root, and it's probably the last VG
# so maybe this loop isn't needed
roots = set()
children = set()
child = {}
for (tag, ref), info in self.tags.items():
if tag == "VG":
here = child.setdefault((tag, ref), set())
for t, r in zip(info["tag"], info["refs"]):
if t == "VG":
children.add((t, r))
roots.discard((t, r))
here.add((t, r))
if tag not in children:
roots.add((tag, ref))
# hierarchical output
output = self._descend_vg(*sorted(roots, key=lambda t: t[1])[-1])
prot = fo.fs.protocol
prot = prot[0] if isinstance(prot, tuple) else prot
store = refs_as_store(self.out, remote_protocol=prot, remote_options=self.st)
g = zarr.open_group(store, zarr_format=2, use_consolidated=False)
refs = {}
for k, v in output.items():
if isinstance(v, dict):
compressor = ZlibCodec() if "refs" in v else None
arr = g.require_array(
name=k,
shape=v["dims"],
dtype=v["dtype"],
chunks=v.get("chunks", v["dims"]),
compressor=compressor,
)
arr.attrs.update(
dict(
_ARRAY_DIMENSIONS=(
[f"{k}_x", f"{k}_y"][: len(v["dims"])]
if "refs" in v
else ["0"]
),
**{
i: j.tolist() if isinstance(j, np.generic) else j
for i, j in v.items()
if i not in {"chunk", "dims", "dtype", "refs"}
},
)
)
for r in v.get("refs", []):
if r[0] == "DEFLATE":
continue
refs[f"{k}/{r[0]}"] = [self.path, r[1], r[2]]
else:
if not k.startswith(
("CoreMetadata.", "ArchiveMetadata.", "StructMetadata.")
):
attrs[k] = v.tolist() if isinstance(v, np.generic) else v
store.fs.references.update(refs)
g.attrs.update(attrs)
if filename is None:
return store.fs.references
with fsspec.open(filename, **(storage_options or {})) as f:
ujson.dumps(dict(store.fs.references), f)
def _descend_vg(self, tag, ref):
info = self.tags[(tag, ref)]
out = {}
for t, r in zip(info["tag"], info["refs"]):
inf2 = self.tags[(t, r)]
if t == "VG":
tmp = self._descend_vg(t, r)
if tmp and list(tmp)[0] == inf2["name"]:
tmp = tmp[inf2["name"]]
out[inf2["name"]] = tmp
elif t == "VH":
if len(inf2["names"]) == 1 and inf2["names"][0].lower() == "values":
dtype = dtypes[inf2["types"][0]]
name = inf2["name"]
inf2 = self.tags[("VS", r)]
self.f.seek(inf2["offset"])
data = self.f.read(inf2["length"])
if dtype == "str":
out[name] = (
data.split(b"\x00", 1)[0].decode().lstrip('"').rstrip('"')
) # decode() ?
else:
out[name] = np.frombuffer(data, dtype)[0]
elif t == "NT":
out["dtype"] = inf2["typ"]
elif t == "SD":
if isinstance(inf2["data"][-1], (tuple, list)):
out["refs"] = inf2["data"][:-1]
out["chunks"] = [_["chunk_length"] for _ in inf2["data"][-1]]
else:
out["refs"] = [inf2["data"]]
out["chunks"] = True
elif t == "SDD":
out["dims"] = inf2["dims"]
elif t == "NDG":
pass # out.setdefault("extra", []).append(_dec_ndg(self, inf2))
if out.get("chunks") is True:
out["chunks"] = out["dims"]
out["refs"] = [
[".".join(["0"] * len(out["dims"]))]
+ [out["refs"][0][1], out["refs"][0][2], out["refs"][0][0]]
]
return out
def _dec(self, tag, ref):
info = self.tags[(tag, ref)]
if not set(info) - {"length", "offset", "extended", "loc"}:
self.f.seek(info["offset"])
if info["extended"]:
info["data"] = self._dec_extended()
else:
info.update(self.decode(tag, info))
return info
def _dec_extended(self):
ext_type = spec[self.read_int(2)]
if ext_type == "CHUNKED":
return self._dec_chunked()
elif ext_type == "LINKED":
return self._dec_linked_header()
elif ext_type == "COMP":
return self._dec_comp()
def _dec_linked_header(self):
# get the bytes of a linked set - these will always be inlined
self.read_int(4) # length
self.read_int(4) # blk_len
self.read_int(4) # num_blk
next_ref = self.read_int(2)
out = []
while next_ref:
next_ref, data = self._dec_linked_block(self.tags[("LINKED", next_ref)])
out.extend([d for d in data if d])
bits = []
for ref in out:
info = self.tags[("LINKED", ref)]
self.f.seek(info["offset"])
bits.append(self.f.read(info["length"]))
return b"".join(bits)
def _dec_linked_block(self, block):
self.f.seek(block["offset"])
next_ref = self.read_int(2)
refs = [self.read_int(2) for _ in range((block["length"] // 2) - 1)]
return next_ref, refs
def _dec_chunked(self):
# we want to turn the chunks table into references
# tag_head_len = self.read_int(4)
# version = self.f.read(1)[0]
# flag = self.read_int(4)
# elem_tot_len = self.read_int(4)
# chunk_size = self.read_int(4)
# nt_size = self.read_int(4)
self.f.seek(21, 1)
chk_tbl_tag = tags[self.read_int(2)] # should be VH
chk_tbl_ref = self.read_int(2)
self.read_int(2) # sp_tab = tags[self.read_int(2)]
self.read_int(2) # sp_ref
ndims = self.read_int(4)
dims = [ # we don't use these, could skip
{
"flag": self.read_int(4),
"dim_length": self.read_int(4),
"chunk_length": self.read_int(4),
}
for _ in range(ndims)
]
self.f.read( # fill_value
self.read_int(4)
) # to be interpreted as a number later; but chunk table probs has no fill
# self.f.seek(12*ndims + 4, 1) # if skipping
header = self._dec(chk_tbl_tag, chk_tbl_ref)
data = self._dec("VS", chk_tbl_ref)["data"] # corresponding table
# header gives the field pattern for the rows of data, one per chunk
# maybe faster to use struct and iter than numpy, since we iterate anyway
dt = [(f"ind{i}", ">u4") for i in range(ndims)] + [
("tag", ">u2"),
("ref", ">u2"),
]
rows = np.frombuffer(data, dtype=dt, count=header["nvert"])
# rows["tag"] should always be 61 -> CHUNK
refs = []
for *ind, tag, ref in rows:
# maybe ind needs reversing since everything is FORTRAN
chunk_tag = self.tags[("CHUNK", ref)]
if chunk_tag["extended"]:
self.f.seek(chunk_tag["offset"])
# these are always COMP?
ctype, offset, length = self._dec_extended()
refs.append([".".join(str(_) for _ in ind), offset, length, ctype])
else:
refs.append(
[
".".join(str(_) for _ in ind),
chunk_tag["offset"],
chunk_tag["length"],
]
)
refs.append(dims)
return refs
def _dec_comp(self):
# version = self.read_int(2) # always 0
# len_uncomp = self.read_int(4)
self.f.seek(6, 1)
data_ref = self.read_int(2)
# model = self.read_int(2) # always 0
ctype = "DEFLATE" # comp[self.read_int(2)]
tag = self.tags[("COMPRESSED", data_ref)]
return ctype, tag["offset"], tag["length"]
@reg("NDG")
def _dec_ndg(self, info):
if "tags" not in info:
return {
"tags": [
(tags[self.read_int(2)], self.read_int(2))
for _ in range(0, info["length"], 4)
]
}
return info["tags"]
@reg("SDD")
def _dec_sdd(self, info):
rank = self.read_int(2)
dims = [self.read_int(4) for _ in range(rank)]
data_tag = (tags[self.read_int(2)], self.read_int(2))
scale_tags = [(tags[self.read_int(2)], self.read_int(2)) for _ in range(rank)]
return _pl(locals())
@reg("VERSION")
def _dec_version(self, info):
return {
"major": self.read_int(4),
"minor": self.read_int(4),
"release": self.read_int(4),
"string:": _null_str(self.f.read(info["length"] - 10).decode()),
}
@reg("VH")
def _dec_vh(self, info):
# virtual group ("table") header
interface = self.read_int(2)
nvert = self.read_int(4)
ivsize = self.read_int(2)
nfields = self.read_int(2)
types = [self.read_int(2) for _ in range(nfields)]
isize = [self.read_int(2) for _ in range(nfields)]
offsets = [self.read_int(2) for _ in range(nfields)]
order = [self.read_int(2) for _ in range(nfields)]
names = [
self.f.read(self.read_int(2)).split(b"\x00", 1)[0].decode()
for _ in range(nfields)
]
namelen = self.read_int(2)
name = self.f.read(namelen).split(b"\x00", 1)[0].decode()
classlen = self.read_int(2)
cls = self.f.read(classlen).split(b"\x00", 1)[0].decode()
ref = (self.read_int(2), self.read_int(2))
return _pl(locals())
@reg("VG")
def _dec_vg(self, info):
nelt = self.read_int(2)
tag = [tags[self.read_int(2)] for _ in range(nelt)]
refs = [self.read_int(2) for _ in range(nelt)]
name = self.f.read(self.read_int(2)).split(b"\x00", 1)[0].decode()
cls = self.f.read(self.read_int(2)).split(b"\x00", 1)[0].decode()
return _pl(locals())
@reg("NT")
def _dec_nt(self, info):
version, typ, width, cls = list(self.f.read(4))
typ = dtypes[typ]
return _pl(locals())
def _null_str(s):
return s.split("\00", 1)[0]
def _pl(l):
return {k: v for k, v in l.items() if k not in {"info", "f", "self"}}
# hdf/src/htags.h
tags = {
1: "NULL",
20: "LINKED",
30: "VERSION",
40: "COMPRESSED",
50: "VLINKED",
51: "VLINKED_DATA",
60: "CHUNKED",
61: "CHUNK",
100: "FID",
101: "FD",
102: "TID",
103: "TD",
104: "DIL",
105: "DIA",
106: "NT",
107: "MT",
108: "FREE",
200: "ID8",
201: "IP8",
202: "RI8",
203: "CI8",
204: "II8",
300: "ID",
301: "LUT",
302: "RI",
303: "CI",
304: "NRI",
306: "RIG",
307: "LD",
308: "MD",
309: "MA",
310: "CCN",
311: "CFM",
312: "AR",
400: "DRAW",
401: "RUN",
500: "XYP",
501: "MTO",
602: "T14",
603: "T105",
700: "SDG",
701: "SDD",
702: "SD",
703: "SDS",
704: "SDL",
705: "SDU",
706: "SDF",
707: "SDM",
708: "SDC",
709: "SDT",
710: "SDLNK",
720: "NDG",
721: "RESERVED",
# "Objects of tag 721 are never actually written to the file. The tag is
# needed to make things easier mixing DFSD and SD style objects in the same file"
731: "CAL",
732: "FV",
799: "BREQ",
781: "SDRAG",
780: "EREQ",
1965: "VG",
1962: "VH",
1963: "VS",
11: "RLE",
12: "IMCOMP",
13: "JPEG",
14: "GREYJPEG",
15: "JPEG5",
16: "GREYJPEG5",
}
spec = {
1: "LINKED",
2: "EXT",
3: "COMP",
4: "VLINKED",
5: "CHUNKED",
6: "BUFFERED",
7: "COMPRAS",
}
# hdf4/hdf/src/hntdefs.h
dtypes = {
5: "f4",
6: "f8",
20: "i1",
21: "u1",
4: "str", # special case, size given in header
22: ">i2",
23: ">u2",
24: ">i4",
25: ">u4",
26: ">i8",
27: ">u8",
}
# hdf4/hdf/src/hcomp.h
comp = {
0: "NONE",
1: "RLE",
2: "NBIT",
3: "SKPHUFF",
4: "DEFLATE", # called deflate, but code says "gzip" and doc says "GNU zip"; actually zlib?
# see codecs.ZlibCodec
5: "SZIP",
7: "JPEG",
}