marulc
Maritime Unpack-Lookup-Convert
A library for parsing and unparsing (future feature) of maritime message formats. Currently supports:
- NMEA0183
- NMEA2000
Main features:
- Parsing NMEA0183 sentences to python dictionaries
- Parsing and decoding NMEA2000 binary messages to python dictionaries
- Support for NMEA2000 messages wrapped in NMEA0183 sentences (``--PGN``-sentences)
- Support for multi-packet NMEA2000 messages (fast-type messages)
Since everything is parsed and decoded into regular python dictionaries, serialization to JSON format is very simple.
Documentation: https://mo-rise.github.io/marulc/
Definitions for parsing and decoding
For NMEA0183, definitions have been extracted from the class-based hierarchy of pynmea2 and copmiled into a JSON definition. It can be found here. The script for extracting these definitions from the pynmea2 source code is available in the scripts
-folder.
For NMEA2000, definitions are identical to what is being used in the CANBOAT project. The definitions can be found here.
Installation
From pypi:
pip install marulc
Example usage
Single NMEA0183 sentence using standard sentence library
from marulc import unpack_nmea0183_message
msg_as_dict = unpack_nmea0183_message("$GNGGA,122203.19,5741.1549,N,01153.1748,E,4,37,0.5,4.03,M,35.78,M,,*72")
Single NMEA0183 sentence wrapping a N2K message using custom formatter
from marulc import NMEA0183Parser
from marulc.custom_parsers.PCDIN import PCDINFormatter
parser = NMEA0183Parser([PCDINFormatter()])
msg_as_dict = parser.unpack(
"$PCDIN,01F201,001935D5,38,0000000B0C477CBC0C0000FFFFFFFFFFFF30007F000000000000*26"
)
Parse from iterator
from marulc import NMEA0183Parser, parse_from_iterator
example_data = [
"$YDGLL,5741.1612,N,01153.1447,E,110759.00,A,A*6B",
"$YDRMC,110759.00,A,5741.1612,N,01153.1447,E,0.0,300.0,010170,,E,A,C*72",
"$YDRPM,E,0,0.0,,A*64",
"$YDRPM,E,1,0.0,,A*65",
"$YDROT,-0.6,A*10",
"$YDHDG,0.0,0.0,E,,*3F",
"$YDHDM,0.0,M*3F",
"$YDRSA,-0.1,A,,V*48",
"$YDVTG,328.0,T,328.0,M,0.0,N,0.0,K,A*29"
]
parser = NMEA0183Parser()
for unpacked_msg in parse_from_iterator(parser, example_data, quiet=True):
print(unpacked_msg)
NMEA2000 frames
from marulc import NMEA2000Parser
parser = NMEA2000Parser()
# Unpack a single frame message
# Note: This will only work for single-frame N2K messages. For multi-frame messages, the unpack
# method will raise a `MultiPacketInProcessError` and expect further frames to be provided
msg_as_dict = parser.unpack("09F10D0A FF 00 00 00 FF 7F FF FF")
# For unpacking multi-frame messages, its usually better to use a `parse_from_iterator` setup, such as:
from marulc import parse_from_iterator
multi_frame_message = [
"09F201B7 C0 1A 01 FF FF FF FF B0",
"09F201B7 C1 81 3C 05 00 00 B0 BA",
"09F201B7 C2 1C 00 FF FF FF FF FF",
"09F201B7 C3 00 00 00 00 7F 7F FF",
]
for full_message in parse_from_iterator(parser, multi_frame_message, quiet=True):
print(full_message)
Filter for specific messages
from marulc import NMEA0183Parser, parse_from_iterator
from marulc.utils import filter_on_talker_formatter
example_data = [
"$YDGLL,5741.1612,N,01153.1447,E,110759.00,A,A*6B",
"$YDRMC,110759.00,A,5741.1612,N,01153.1447,E,0.0,300.0,010170,,E,A,C*72",
"$YDRPM,E,0,0.0,,A*64",
"$YDRPM,E,1,0.0,,A*65",
"$YDROT,-0.6,A*10",
"$YDHDG,0.0,0.0,E,,*3F",
"$YDHDM,0.0,M*3F",
"$YDRSA,-0.1,A,,V*48",
"$YDVTG,328.0,T,328.0,M,0.0,N,0.0,K,A*29"
]
parser = NMEA0183Parser()
iterator_all = parse_from_iterator(parser, example_data, quiet=True)
rpm_sentences = list(filter(filter_on_talker_formatter("..RPM"), iterator_all))
assert len(rpm_sentences) == 2
Extract specific value from specific messages
from marulc import NMEA2000Parser, parse_from_iterator
from marulc.utils import filter_on_pgn, deep_get
example_data = [
"08FF12C9 4A 9A 00 17 DB 00 00 00",
"08FF13C9 4A 9A 00 00 FF FF FF FF",
"08FF14C9 4A 9A 00 00 00 00 00 FF",
"09F200C9 00 57 30 FF FF 01 FF FF",
"09F205C9 00 FC FF FF FF FF 00 FF",
"09F10DE5 00 F8 FF 7F F9 FE FF FF",
"09F11365 DA AB 4B FE FF FF FF FF",
"08FF12B7 4A 9A 01 17 DB 00 00 00",
"08FF13B7 4A 9A 01 00 FF FF FF FF",
"08FF14B7 4A 9A 01 00 00 00 00 FF",
"09F200B7 01 DA 2F FF FF 01 FF FF",
"09F205B7 01 FC FF FF FF FF 00 FF",
]
parser = NMEA2000Parser()
iterator_all = parse_from_iterator(parser, example_data, quiet=True)
speeds = []
for filtered_unpacked_msg in filter(filter_on_pgn(127488), iterator_all):
speed = deep_get(filtered_unpacked_msg, "Fields", "speed")
speeds.append(speed)
assert len(speeds) == 2
Extraction using JSON pointers
Requires the jsonpointer
package (pip install jsonpointer
)
from jsonpointer import resolve_pointer
from marulc import NMEA2000Parser, parse_from_iterator
from marulc.utils import filter_on_pgn, deep_get
example_data = [
"08FF12C9 4A 9A 00 17 DB 00 00 00",
"08FF13C9 4A 9A 00 00 FF FF FF FF",
"08FF14C9 4A 9A 00 00 00 00 00 FF",
"09F200C9 00 57 30 FF FF 01 FF FF",
"09F205C9 00 FC FF FF FF FF 00 FF",
"09F10DE5 00 F8 FF 7F F9 FE FF FF",
"09F11365 DA AB 4B FE FF FF FF FF",
"08FF12B7 4A 9A 01 17 DB 00 00 00",
"08FF13B7 4A 9A 01 00 FF FF FF FF",
"08FF14B7 4A 9A 01 00 00 00 00 FF",
"09F200B7 01 DA 2F FF FF 01 FF FF",
"09F205B7 01 FC FF FF FF FF 00 FF",
]
parser = NMEA2000Parser()
iterator_all = parse_from_iterator(parser, example_data, quiet=True)
speeds = []
for filtered_unpacked_msg in filter(filter_on_pgn(127488), iterator_all):
speed = resolve_pointer(filtered_unpacked_msg, "/Fields/speed")
speeds.append(speed)
assert len(speeds) == 2
Development setup
The repository includes a devcontainer setup which is the recommended way of creating a development environment. See here for a generic get-started in VSCode.
Run the formatter and linter:
black marulc tests
pylint marulc
Run the tests:
pytest --codeblocks
License
See LICENSE
API Reference
Package (marulc
)
Maritime Unpack-Lookup-Convert
Submodule (marulc.nmea0183
)
Containing functionality for unpacking textual NMEA0183 messages
- class marulc.nmea0183.NMEA0183Parser(custom_formatters: Sequence[Type[NMEA0183FormatterBase]] | None = None)
Bases:
RawParserBase
A parser for parsing raw NMEA0183 strings
- unpack(msg: str) dict
Unpack a message using this parser
- Parameters:
msg (Any) – Message to be unpacked
- marulc.nmea0183.calculate_checksum(nmea_str: str) int
Calculate checksum from inputted raw nmea string
- Parameters:
nmea_str (str) – Raw received nmea string
- Returns:
Calculated checksum
- Return type:
int
- marulc.nmea0183.get_description_for_sentence_formatter(sentence_formatter: str) dict
Get the description and template for this sentence formatter :param sentence_formatter: Sentence formatter :type sentence_formatter: str
- Returns:
Description
- Return type:
dict
- marulc.nmea0183.parse_value(value: str) str | int | float
Parses a value to either str, int or float depending on format
- Parameters:
value (str) – Inputted raw string
- Returns:
Parsed output
- Return type:
Union[str, int, float]
- marulc.nmea0183.unpack_nmea0183_message(line: str, standard_custom_formatters: Dict[str, Callable] | None = None, proprietary_custom_formatters: Dict[str, Callable] | None = None) dict
Parses a string representing a NMEA 0183 sentence, and returns a python dictionary with the unpacked sentence
- Parameters:
line (str) – Raw NMEA0183 sentence
standard_custom_formatters (Optional[Dict[str, Callable]]) – Dict with custom sentence formatters. Keys are sentence formatter strings (ex. ‘PGN’) and values are callables returning a parsed message for the specific sentence formatter.
proprietary_custom_formatters (Optional[Dict[str, Callable]]) – Dict with custom sentence formatters. Keys are sentence formatter strings (ex. ‘PGN’) and values are callables returning a parsed message for the specific sentence formatter.
- Raises:
ParseError – If parsing of message fails
ChecksumError – If checksum does not match
SentenceTypeError – If the inputted NMEA sentence is of a type that is not supported
- Returns:
Complete unpacked message
- Return type:
dict
- marulc.nmea0183.unpack_using_definition(definition: dict, data: list) dict
Unpack a list of data elements using the provided definition
- Parameters:
definition (dict) – Definition describing how the data should be interpreted
data (list) – Raw data elements
- Returns:
Unpacked data including parsed values and descriptions
- Return type:
dict
- marulc.nmea0183.unpack_using_proprietary(manufacturer: str, data: str) dict
Unpack a raw, proprietary message based on knowledge about the manufacturer
- Parameters:
manufacturer (str) – Manufacturer acronym
data (str) – Raw data elements
- Raises:
ParseError – If a definition could not be found for this proprietary message
- Returns:
Unpacked data including parsed values and descriptions
- Return type:
dict
Submodule (marulc.nmea2000
)
Containing functionality for unpacking binary n2k messages according to PGN-specific definitions
- class marulc.nmea2000.NMEA2000Parser
Bases:
RawParserBase
A parser for parsing raw NMEA2000 CAN frames in hex format, example:
09F201C9 41823C050000C0C8 09F201C9 421C00FFFFFFFFFF 09F201C9 43000000007F7FFF 09F80265 79FC77BA0000FFFF 09F10DE5 00F8FF7FA6F8FFFF 09FE1065 1F1FFFFFFFFFFFFF 09F11365 7A849E0100FFFFFF 08FF12B7 4A9A011781003200 08FF13B7 4A9A0100FFFFFFFF 08FF14B7 4A9A0100000000FF 09F200B7 010000FFFF00FFFF 09F205B7 01FDFFFFFFFF00FF 08FF12C9 4A9A001781003C00 08FF13C9 4A9A0000FFFFFFFF 08FF14C9 4A9A0000000000FF
- unpack(frame: str) dict
Unpack a message using this parser
- Parameters:
msg (Any) – Message to be unpacked
- marulc.nmea2000.get_description_for_pgn(pgn: int) dict
Get the description and template for this pgn in the format of a python dictionary
- Parameters:
pgn (int) – PGN number
- Returns:
Description
- Return type:
dict
- marulc.nmea2000.packet_field_decoder(pgn: int) CompiledFormat
Returns a pre-compiled bit field decoder for the message definition associated with this PGn number
- Parameters:
pgn (int) – PGN number
- Returns:
Pre-compiled bit field decoder
- Return type:
bitstruct.CompiledFormat
- marulc.nmea2000.packet_total_length(pgn: int) int
Returns the total length of the binary message associated with this PGN number
- Parameters:
pgn (int) – PGN number
- Returns:
Total length (number of bytes)
- Return type:
int
- marulc.nmea2000.packet_type(pgn: int) str
Return the packet type associated with this PGN number
- Parameters:
pgn (int) – PGN number
- Returns:
Packet type
- Return type:
str
- marulc.nmea2000.process_sub_packet(pgn: int, address: int, data: bytearray, bucket: dict)
Process a single subpacket part of a multi-packet n2k message. The following description of the protocol is taken from CANBOAT:
NMEA 2000 uses the 8 ‘data’ bytes as follows for fast packet type: data[0] is an ‘order’ that increments, or not (depending a bit on implementation). If the size of the packet <= 7 then the data follows in data[1..7] If the size of the packet > 7 then the next byte data[1] is the size of the payload and data[0] is divided into 5 bits index into the fast packet, and 3 bits ‘order that increases. This means that for ‘fast packets’ the first bucket (sub-packet) contains 6 payload bytes and 7 for remaining. Since the max index is 31, the maximal payload is 6 + 31 * 7 = 223 bytes
- Parameters:
pgn (int) – PGN number
address (int) – Source address of message
data (bytearray) – Raw binary packet data
bucket (dict) – Reference to temporary storage for partly parsed messages
- Raises:
MultiPacketDiscardedError – If this subpacket is discarded due to missing messages
MultiPacketInProcessError – If this subpacket has been processed successfully but we require more subpackets to be able to decode the full message
- Returns:
Complete, raw binary message stitched together from multiple subpackets
- Return type:
bytearray
- marulc.nmea2000.unpack_complete_message(pgn: int, data: bytearray) dict
Unpack a complete n2k message associated with this PGN number
- Parameters:
pgn (int) – PGN number
data (bytearray) – Complete, raw binary message as a bytearray
- Returns:
Unpacked message as a python dictionary
- Return type:
dict
- marulc.nmea2000.unpack_fields(pgn: int, data: bytearray) dict
Unpack all fields of a complete binary message into a python dictionary
- Parameters:
pgn (int) – PGN number
data (bytearray) – Complete, raw binary message as a bytearray
- Returns:
Unpacked fields as a python dictionary
- Return type:
dict
- marulc.nmea2000.unpack_header(header: bytearray)
Unpack the N2K header into priority, PGN number and source address. See https://www.kvaser.com/about-can/higher-layer-protocols/j1939-introduction/ for more details.
- Parameters:
header (bytearray) – The N2K header as a bytearray
- Returns:
(Source address, PGN, priority)
- Return type:
tuple (int, int, int)
Submodule (marulc.utils
)
Utility functions
- marulc.utils.deep_get(dikt: dict, *keys: str, default: Any = None) Any
A “deep getter” for nested dictionaries
from marulc.utils import deep_get d = {"A": {"B":{"C":"3}}} deep_get(d, "A", "B", "C") # Returns 3 deep_get(d, "A", "B", "C", "D") # Returns None deep_get(d, "A", "B", "C", "D", default=89) # Returns 89
- Parameters:
dikt (dict) – Nested dictionary to operate on
*keys (str) – Any number of keys in descending level in hte nested dictionary.
default (Any, optional) – Default value to return if chain of keys cant be followed. Defaults to None.
- Returns:
The value found at the bottom of the keys or the default value.
- Return type:
Any
- marulc.utils.filter_on_pgn(*PGNs: int) Callable[[dict], bool]
Create filter that filters on specific PGN numbers
from marulc import parse_from_iterator from marulc.utils import filter_on_pgn source = open("nmea_log.txt") filtered = filter_on_pgn(127488)(parse_from_iterator(source))
- Parameters:
*PGNs (str) – Any number of PGN numbers that will be matched against a “PGN” key.
- Returns:
A pre-loaded filter callable
- Return type:
Callable[[dict], bool]
- marulc.utils.filter_on_talker_formatter(*regexes: str) Callable[[dict], bool]
Create filter that filters on specific talkers and/or sentence formatters
from marulc import parse_from_iterator from marulc.utils import filter_on_talker_formatter source = open("nmea_log.txt") filtered = filter_on_talker_formatter("..GGA", "PASHR")(parse_from_iterator(source))
- Parameters:
*regexes (str) – Any number of strings or regex expressions that will be matched against a “Talker” and “Formatter” key combination.
- Returns:
A pre-loaded filter callable
- Return type:
Callable[[dict], bool]
- marulc.utils.parse_from_iterator(parser: Type[RawParserBase], source: Iterable[str], quiet=False) Iterable[dict]
Helper function for unpacking NMEA sentences from an iterable source
- Parameters:
parser (Type[RawParserBase]) – Parser conforming to the RawParser interface.
source (Iterable[str]) – Iterable source which yields NMEA 0183 sentences
quiet (bool, optional) – Whether exceptions encountered should be raised or silenced. Defaults to False.
- Yields:
Iterator[Iterable[dict]] –
- The next, complete, unpacked message as a python
dictionary.
Submodule (marulc.exceptions
)
Custom exceptions
- exception marulc.exceptions.ChecksumError(message, data)
Bases:
ParseError
- exception marulc.exceptions.MultiPacketDiscardedError
Bases:
MultiPacketError
- exception marulc.exceptions.MultiPacketError
Bases:
RuntimeError
- exception marulc.exceptions.MultiPacketInProcessError
Bases:
MultiPacketError
- exception marulc.exceptions.PGNError(message, data)
Bases:
ParseError
- exception marulc.exceptions.ParseError(message, data)
Bases:
ValueError
- exception marulc.exceptions.SentenceTypeError(message, data)
Bases:
ParseError