marulc

Maritime Unpack-Lookup-Convert

A library for parsing and unparsing (future feature) of maritime message formats. Currently supports:

- NMEA0183
- NMEA2000

Main features:

- Parsing NMEA0183 sentences to python dictionaries
- Parsing and decoding NMEA2000 binary messages to python dictionaries
- Support for NMEA2000 messages wrapped in NMEA0183 sentences (``--PGN``-sentences)
- Support for multi-packet NMEA2000 messages (fast-type messages)

Since everything is parsed and decoded into regular python dictionaries, serialization to JSON format is very simple.

Documentation: https://mo-rise.github.io/marulc/

Definitions for parsing and decoding

For NMEA0183, definitions have been extracted from the class-based hierarchy of pynmea2 and copmiled into a JSON definition. It can be found here. The script for extracting these definitions from the pynmea2 source code is available in the scripts-folder.

For NMEA2000, definitions are identical to what is being used in the CANBOAT project. The definitions can be found here.

Installation

From pypi:

pip install marulc

Example usage

Single NMEA0183 sentence using standard sentence library

from marulc import unpack_nmea0183_message

msg_as_dict = unpack_nmea0183_message("$GNGGA,122203.19,5741.1549,N,01153.1748,E,4,37,0.5,4.03,M,35.78,M,,*72")

Single NMEA0183 sentence wrapping a N2K message using custom formatter

from marulc import NMEA0183Parser
from marulc.custom_parsers.PCDIN import PCDINFormatter

parser = NMEA0183Parser([PCDINFormatter()])

msg_as_dict = parser.unpack(
    "$PCDIN,01F201,001935D5,38,0000000B0C477CBC0C0000FFFFFFFFFFFF30007F000000000000*26"
)

Parse from iterator

from marulc import NMEA0183Parser, parse_from_iterator

example_data = [
    "$YDGLL,5741.1612,N,01153.1447,E,110759.00,A,A*6B",
    "$YDRMC,110759.00,A,5741.1612,N,01153.1447,E,0.0,300.0,010170,,E,A,C*72",
    "$YDRPM,E,0,0.0,,A*64",
    "$YDRPM,E,1,0.0,,A*65",
    "$YDROT,-0.6,A*10",
    "$YDHDG,0.0,0.0,E,,*3F",
    "$YDHDM,0.0,M*3F",
    "$YDRSA,-0.1,A,,V*48",
    "$YDVTG,328.0,T,328.0,M,0.0,N,0.0,K,A*29"
]

parser = NMEA0183Parser()

for unpacked_msg in parse_from_iterator(parser, example_data, quiet=True):
    print(unpacked_msg)

NMEA2000 frames

from marulc import NMEA2000Parser

parser = NMEA2000Parser()

# Unpack a single frame message
# Note: This will only work for single-frame N2K messages. For multi-frame messages, the unpack
# method will raise a `MultiPacketInProcessError` and expect further frames to be provided
msg_as_dict = parser.unpack("09F10D0A FF 00 00 00 FF 7F FF FF")

# For unpacking multi-frame messages, its usually better to use a `parse_from_iterator` setup, such as:
from marulc import parse_from_iterator

multi_frame_message = [
    "09F201B7 C0 1A 01 FF FF FF FF B0",
    "09F201B7 C1 81 3C 05 00 00 B0 BA",
    "09F201B7 C2 1C 00 FF FF FF FF FF",
    "09F201B7 C3 00 00 00 00 7F 7F FF",
]

for full_message in parse_from_iterator(parser, multi_frame_message, quiet=True):
    print(full_message)

Filter for specific messages

from marulc import NMEA0183Parser, parse_from_iterator
from marulc.utils import filter_on_talker_formatter

example_data = [
    "$YDGLL,5741.1612,N,01153.1447,E,110759.00,A,A*6B",
    "$YDRMC,110759.00,A,5741.1612,N,01153.1447,E,0.0,300.0,010170,,E,A,C*72",
    "$YDRPM,E,0,0.0,,A*64",
    "$YDRPM,E,1,0.0,,A*65",
    "$YDROT,-0.6,A*10",
    "$YDHDG,0.0,0.0,E,,*3F",
    "$YDHDM,0.0,M*3F",
    "$YDRSA,-0.1,A,,V*48",
    "$YDVTG,328.0,T,328.0,M,0.0,N,0.0,K,A*29"
]

parser = NMEA0183Parser()

iterator_all = parse_from_iterator(parser, example_data, quiet=True)

rpm_sentences = list(filter(filter_on_talker_formatter("..RPM"), iterator_all))
assert len(rpm_sentences) == 2

Extract specific value from specific messages

from marulc import NMEA2000Parser, parse_from_iterator
from marulc.utils import filter_on_pgn, deep_get

example_data = [
    "08FF12C9 4A 9A 00 17 DB 00 00 00",
    "08FF13C9 4A 9A 00 00 FF FF FF FF",
    "08FF14C9 4A 9A 00 00 00 00 00 FF",
    "09F200C9 00 57 30 FF FF 01 FF FF",
    "09F205C9 00 FC FF FF FF FF 00 FF",
    "09F10DE5 00 F8 FF 7F F9 FE FF FF",
    "09F11365 DA AB 4B FE FF FF FF FF",
    "08FF12B7 4A 9A 01 17 DB 00 00 00",
    "08FF13B7 4A 9A 01 00 FF FF FF FF",
    "08FF14B7 4A 9A 01 00 00 00 00 FF",
    "09F200B7 01 DA 2F FF FF 01 FF FF",
    "09F205B7 01 FC FF FF FF FF 00 FF",
]

parser = NMEA2000Parser()

iterator_all = parse_from_iterator(parser, example_data, quiet=True)

speeds = []
for filtered_unpacked_msg in filter(filter_on_pgn(127488), iterator_all):
    speed = deep_get(filtered_unpacked_msg, "Fields", "speed")
    speeds.append(speed)

assert len(speeds) == 2

Extraction using JSON pointers Requires the jsonpointer package (pip install jsonpointer)

from jsonpointer import resolve_pointer

from marulc import NMEA2000Parser, parse_from_iterator
from marulc.utils import filter_on_pgn, deep_get

example_data = [
    "08FF12C9 4A 9A 00 17 DB 00 00 00",
    "08FF13C9 4A 9A 00 00 FF FF FF FF",
    "08FF14C9 4A 9A 00 00 00 00 00 FF",
    "09F200C9 00 57 30 FF FF 01 FF FF",
    "09F205C9 00 FC FF FF FF FF 00 FF",
    "09F10DE5 00 F8 FF 7F F9 FE FF FF",
    "09F11365 DA AB 4B FE FF FF FF FF",
    "08FF12B7 4A 9A 01 17 DB 00 00 00",
    "08FF13B7 4A 9A 01 00 FF FF FF FF",
    "08FF14B7 4A 9A 01 00 00 00 00 FF",
    "09F200B7 01 DA 2F FF FF 01 FF FF",
    "09F205B7 01 FC FF FF FF FF 00 FF",
]

parser = NMEA2000Parser()

iterator_all = parse_from_iterator(parser, example_data, quiet=True)

speeds = []
for filtered_unpacked_msg in filter(filter_on_pgn(127488), iterator_all):
    speed = resolve_pointer(filtered_unpacked_msg, "/Fields/speed")
    speeds.append(speed)

assert len(speeds) == 2

Development setup

The repository includes a devcontainer setup which is the recommended way of creating a development environment. See here for a generic get-started in VSCode.

Run the formatter and linter:

black marulc tests
pylint marulc

Run the tests:

pytest --codeblocks

License

See LICENSE

API Reference

Package (marulc)

Maritime Unpack-Lookup-Convert

Submodule (marulc.nmea0183)

Containing functionality for unpacking textual NMEA0183 messages

class marulc.nmea0183.NMEA0183Parser(custom_formatters: Sequence[Type[NMEA0183FormatterBase]] | None = None)

Bases: RawParserBase

A parser for parsing raw NMEA0183 strings

unpack(msg: str) dict

Unpack a message using this parser

Parameters:

msg (Any) – Message to be unpacked

marulc.nmea0183.calculate_checksum(nmea_str: str) int

Calculate checksum from inputted raw nmea string

Parameters:

nmea_str (str) – Raw received nmea string

Returns:

Calculated checksum

Return type:

int

marulc.nmea0183.get_description_for_sentence_formatter(sentence_formatter: str) dict

Get the description and template for this sentence formatter :param sentence_formatter: Sentence formatter :type sentence_formatter: str

Returns:

Description

Return type:

dict

marulc.nmea0183.parse_value(value: str) str | int | float

Parses a value to either str, int or float depending on format

Parameters:

value (str) – Inputted raw string

Returns:

Parsed output

Return type:

Union[str, int, float]

marulc.nmea0183.unpack_nmea0183_message(line: str, standard_custom_formatters: Dict[str, Callable] | None = None, proprietary_custom_formatters: Dict[str, Callable] | None = None) dict

Parses a string representing a NMEA 0183 sentence, and returns a python dictionary with the unpacked sentence

Parameters:
  • line (str) – Raw NMEA0183 sentence

  • standard_custom_formatters (Optional[Dict[str, Callable]]) – Dict with custom sentence formatters. Keys are sentence formatter strings (ex. ‘PGN’) and values are callables returning a parsed message for the specific sentence formatter.

  • proprietary_custom_formatters (Optional[Dict[str, Callable]]) – Dict with custom sentence formatters. Keys are sentence formatter strings (ex. ‘PGN’) and values are callables returning a parsed message for the specific sentence formatter.

Raises:
Returns:

Complete unpacked message

Return type:

dict

marulc.nmea0183.unpack_using_definition(definition: dict, data: list) dict

Unpack a list of data elements using the provided definition

Parameters:
  • definition (dict) – Definition describing how the data should be interpreted

  • data (list) – Raw data elements

Returns:

Unpacked data including parsed values and descriptions

Return type:

dict

marulc.nmea0183.unpack_using_proprietary(manufacturer: str, data: str) dict

Unpack a raw, proprietary message based on knowledge about the manufacturer

Parameters:
  • manufacturer (str) – Manufacturer acronym

  • data (str) – Raw data elements

Raises:

ParseError – If a definition could not be found for this proprietary message

Returns:

Unpacked data including parsed values and descriptions

Return type:

dict

Submodule (marulc.nmea2000)

Containing functionality for unpacking binary n2k messages according to PGN-specific definitions

class marulc.nmea2000.NMEA2000Parser

Bases: RawParserBase

A parser for parsing raw NMEA2000 CAN frames in hex format, example:

09F201C9 41823C050000C0C8
09F201C9 421C00FFFFFFFFFF
09F201C9 43000000007F7FFF
09F80265 79FC77BA0000FFFF
09F10DE5 00F8FF7FA6F8FFFF
09FE1065 1F1FFFFFFFFFFFFF
09F11365 7A849E0100FFFFFF
08FF12B7 4A9A011781003200
08FF13B7 4A9A0100FFFFFFFF
08FF14B7 4A9A0100000000FF
09F200B7 010000FFFF00FFFF
09F205B7 01FDFFFFFFFF00FF
08FF12C9 4A9A001781003C00
08FF13C9 4A9A0000FFFFFFFF
08FF14C9 4A9A0000000000FF
unpack(frame: str) dict

Unpack a message using this parser

Parameters:

msg (Any) – Message to be unpacked

marulc.nmea2000.get_description_for_pgn(pgn: int) dict

Get the description and template for this pgn in the format of a python dictionary

Parameters:

pgn (int) – PGN number

Returns:

Description

Return type:

dict

marulc.nmea2000.packet_field_decoder(pgn: int) CompiledFormat

Returns a pre-compiled bit field decoder for the message definition associated with this PGn number

Parameters:

pgn (int) – PGN number

Returns:

Pre-compiled bit field decoder

Return type:

bitstruct.CompiledFormat

marulc.nmea2000.packet_total_length(pgn: int) int

Returns the total length of the binary message associated with this PGN number

Parameters:

pgn (int) – PGN number

Returns:

Total length (number of bytes)

Return type:

int

marulc.nmea2000.packet_type(pgn: int) str

Return the packet type associated with this PGN number

Parameters:

pgn (int) – PGN number

Returns:

Packet type

Return type:

str

marulc.nmea2000.process_sub_packet(pgn: int, address: int, data: bytearray, bucket: dict)

Process a single subpacket part of a multi-packet n2k message. The following description of the protocol is taken from CANBOAT:

NMEA 2000 uses the 8 ‘data’ bytes as follows for fast packet type: data[0] is an ‘order’ that increments, or not (depending a bit on implementation). If the size of the packet <= 7 then the data follows in data[1..7] If the size of the packet > 7 then the next byte data[1] is the size of the payload and data[0] is divided into 5 bits index into the fast packet, and 3 bits ‘order that increases. This means that for ‘fast packets’ the first bucket (sub-packet) contains 6 payload bytes and 7 for remaining. Since the max index is 31, the maximal payload is 6 + 31 * 7 = 223 bytes

Parameters:
  • pgn (int) – PGN number

  • address (int) – Source address of message

  • data (bytearray) – Raw binary packet data

  • bucket (dict) – Reference to temporary storage for partly parsed messages

Raises:
Returns:

Complete, raw binary message stitched together from multiple subpackets

Return type:

bytearray

marulc.nmea2000.unpack_complete_message(pgn: int, data: bytearray) dict

Unpack a complete n2k message associated with this PGN number

Parameters:
  • pgn (int) – PGN number

  • data (bytearray) – Complete, raw binary message as a bytearray

Returns:

Unpacked message as a python dictionary

Return type:

dict

marulc.nmea2000.unpack_fields(pgn: int, data: bytearray) dict

Unpack all fields of a complete binary message into a python dictionary

Parameters:
  • pgn (int) – PGN number

  • data (bytearray) – Complete, raw binary message as a bytearray

Returns:

Unpacked fields as a python dictionary

Return type:

dict

marulc.nmea2000.unpack_header(header: bytearray)

Unpack the N2K header into priority, PGN number and source address. See https://www.kvaser.com/about-can/higher-layer-protocols/j1939-introduction/ for more details.

Parameters:

header (bytearray) – The N2K header as a bytearray

Returns:

(Source address, PGN, priority)

Return type:

tuple (int, int, int)

Submodule (marulc.utils)

Utility functions

marulc.utils.deep_get(dikt: dict, *keys: str, default: Any = None) Any

A “deep getter” for nested dictionaries

from marulc.utils import deep_get

d = {"A": {"B":{"C":"3}}}
deep_get(d, "A", "B", "C") # Returns 3
deep_get(d, "A", "B", "C", "D") # Returns None
deep_get(d, "A", "B", "C", "D", default=89) # Returns 89
Parameters:
  • dikt (dict) – Nested dictionary to operate on

  • *keys (str) – Any number of keys in descending level in hte nested dictionary.

  • default (Any, optional) – Default value to return if chain of keys cant be followed. Defaults to None.

Returns:

The value found at the bottom of the keys or the default value.

Return type:

Any

marulc.utils.filter_on_pgn(*PGNs: int) Callable[[dict], bool]

Create filter that filters on specific PGN numbers

from marulc import parse_from_iterator
from marulc.utils import filter_on_pgn

source = open("nmea_log.txt")
filtered = filter_on_pgn(127488)(parse_from_iterator(source))
Parameters:

*PGNs (str) – Any number of PGN numbers that will be matched against a “PGN” key.

Returns:

A pre-loaded filter callable

Return type:

Callable[[dict], bool]

marulc.utils.filter_on_talker_formatter(*regexes: str) Callable[[dict], bool]

Create filter that filters on specific talkers and/or sentence formatters

from marulc import parse_from_iterator
from marulc.utils import filter_on_talker_formatter

source = open("nmea_log.txt")
filtered = filter_on_talker_formatter("..GGA", "PASHR")(parse_from_iterator(source))
Parameters:

*regexes (str) – Any number of strings or regex expressions that will be matched against a “Talker” and “Formatter” key combination.

Returns:

A pre-loaded filter callable

Return type:

Callable[[dict], bool]

marulc.utils.parse_from_iterator(parser: Type[RawParserBase], source: Iterable[str], quiet=False) Iterable[dict]

Helper function for unpacking NMEA sentences from an iterable source

Parameters:
  • parser (Type[RawParserBase]) – Parser conforming to the RawParser interface.

  • source (Iterable[str]) – Iterable source which yields NMEA 0183 sentences

  • quiet (bool, optional) – Whether exceptions encountered should be raised or silenced. Defaults to False.

Yields:

Iterator[Iterable[dict]]

The next, complete, unpacked message as a python

dictionary.

Submodule (marulc.exceptions)

Custom exceptions

exception marulc.exceptions.ChecksumError(message, data)

Bases: ParseError

exception marulc.exceptions.MultiPacketDiscardedError

Bases: MultiPacketError

exception marulc.exceptions.MultiPacketError

Bases: RuntimeError

exception marulc.exceptions.MultiPacketInProcessError

Bases: MultiPacketError

exception marulc.exceptions.PGNError(message, data)

Bases: ParseError

exception marulc.exceptions.ParseError(message, data)

Bases: ValueError

exception marulc.exceptions.SentenceTypeError(message, data)

Bases: ParseError

Indices and tables