Confluent Kafka File System (API and Schema Registry) - Import
Bridge Requirements
This bridge:requires Internet access to https://repo.maven.apache.org/maven2/ and/or other tool sites to download drivers into <TDC_HOME>/data/download/MIMB/.
Bridge Specifications
Vendor | Confluent |
Tool Name | Kafka File System |
Tool Version | Kafka 2.x |
Tool Web Site | http://kafka.apache.org/ |
Supported Methodology | [File System] Multi-Model, Data Store (NoSQL / Hierarchical, Physical Data Model) via Java API |
Data Profiling | |
Incremental Harvesting | |
Multi-Model Harvesting | |
Remote Repository Browsing for Model Selection |
SPECIFICATIONS
Tool: Confluent / Kafka File System version Kafka 2.x via Java API
See http://kafka.apache.org/
Metadata: [File System] Multi-Model, Data Store (NoSQL / Hierarchical, Physical Data Model)
Component: ApacheKafkaImport.Confluent version 11.2.0
DISCLAIMER
This import bridge requires internet access to download third-party libraries:
- such as https://repo.maven.apache.org/maven2/ to download open source third-party libraries,
- and more sites for other third-party software such as database specific JDBC drivers.
The downloaded third-party libraries are stored into $HOME/data/download/MIMB/
- If HTTPS fails, the import bridge then tries with HTTP.
- If a proxy is used to access internet, you must configure that proxy in the JRE (see the -j option in the Miscellaneous parameter).
- If the import bridge does not have full access to internet, that $HOME/data/download/MIMB/ directory can be copied from another server with internet access where the command $HOME/bin/MIMB.sh (or .bat) -d can be used to download all third-party libraries used by all bridges at once.
By running this import bridge, you hereby acknowledge responsibility for the license terms and any potential security vulnerabilities from these downloaded third-party software libraries.
OVERVIEW
The primary use case of Kafka is for high-performance data pipelines from messaging queue to a full-fledged event streaming platform. In such case, all topics are exclusively composed of Avro files with metadata described in a (Confluent) Kafka registry.
When the "Schema Registry URL" parameter is specified, this import bridge automatically retrieves the metadata of that topic from the Kafka registry.
FREQUENTLY ASKED QUESTIONS
Q: When connecting to Kafka using;
"PLAIN authentication"
Specify 'JAAS configuration path' and leave empty 'Kafka brokers principal name' parameter.
"KERBEROS authentication"
Specify values of both parameters.
"without authentication"
Leave both of these parameters empty.
Please refer to the individual parameter's documentation for more details.
Q: How to fix an exception during import, "Exception:No entry found for connection 2"?
A: The machine which runs the bridge is not properly configured for Kafka DataNode name resolution. When the bridge invokes the Kafka API to ask for the NameNode where the message data is, the NameNode returns the hostname (not an IP) of the DataNode machine. However, in this case the hostname could not be translated into an IP because of hostname resolution issues on that machine. To resolve, add an entry for the DataNode machine hostname in the "hosts" system config file.
LIMITATIONS
Refer to the current general known limitations at MIMB Known Limitations or bundled in Documentation/ReadMe/MIMBKnownLimitations.html
When you run both Kafka cluster (server) version 1.1.x and the import bridge (client) on Windows systems the import could fail with a timeout error. The Kafka version 2.0.x resolved the issue.
SUPPORTED FILES
Data Definition / Schema / Metadata file formats (no data):
- Fixed Width files typically from mainframe (see details below)
- COBOL COPYBOOK files typically from mainframe (see details below)
- W3C XML XSD (XML Schema Definition)
Text data file formats (data sampling driven metadata discovery):
- Delimited (Flat) files such as CSV (see details below)
- Open Office Excel XML .XSLX (see details below)
- W3C XML (not defined from XML XSD)
- JSON (JavaScript Object Notation) (see details below)
Binary data file formats (which include a schema definition as header or footer):
- Apache Avro (see details below)
- Apache Parquet (see details below)
- Apache ORC (see details below)
as well as the compressed versions of the above formats:
- ZIP (as a compression format, not as archive format)
- BZIP
- GZIP
- LZ4
- Snappy (as standard Snappy format, not as Hadoop native Snappy format)
DELIMITED FILES
This bridge detects (reverse engineer) the metadata from a data file of type Delimited File (also known as Flat File).
The detection of such Delimited File is not based on file extensions (such as .CSV, .PSV) but rather by sampling the file content.
The bridge can detect a header row, and use it to create the field name, otherwise generic field names are created.
The bridge samples up to 100 rows in order to automatically detect the field separators which by default include:
', (comma)', '; (semicolon)', ': (colon)', '\t (tab)', '| (pipe)', '0x1 (ctrl+A)', 'BS (\u0008)'
More separators can be added in the auto detection process (including double characters), see the Miscellaneous parameter.
During the sampling, the bridge also detects the file data types, such as DATE, NUMBER, STRING.
FIXED WIDTH FILES
This bridge creates metadata for data files of type Fixed Width File.
Such metadata cannot be automatically detected (reverse engineered) by sampling the data files (e.g. customers.dat or even just customers with no extension).
Therefore, this bridge imports a 'Fixed Width File Definition' file which must be with extension .fixed_width_file_definition format file
(e.g. customers.dat.fixed_width_file_definition format file will create the metadata of a file named file customers with the fields defined inside)
This is the equivalent of a RDBMS DDL for fixed width files. With such a long extension, this data definition file can coexist with the actual data files in the each file system directory containing them.
The 'Fixed Width File Definition' file format is defined as follows:
- Format file must start with the following header:
column name, offset, width, data type, comment
- All offsets must be unique and greater than or equal to 0.
a,0
b,4
- The file format is invalid when some columns have offsets and others don't.
a,0
b,
c,4
- When all columns do not have offsets but have widths the application assumes that columns are ordered and calculates offsets based on widths.
a,,4 -> a,1,4
b,,25 -> b,5,25
- When the offset is present the application ignores widths as they are calculated from the offsets.
a,1,4
b,5,25
- Types and comments are used as documentation only.
a,1,4,int
b,5,25,char[25],identifier
This bridge detects the following data types: INTEGER, FLOAT, STRING, DATE, BOOLEAN.
COBOL COPYBOOK FILES
This bridge can only import the COBOL COPYBOOK files (which contain the data definitions), therefore does not detect (reverse engineer) metadata from actual COBOL data files.
The detection of such COBOL COPYBOOK file is not based on file extensions (such as .CPY) but rather by sampling the file content.
This bridge creates a 'Physical Hierarchical Model' which reflects a truly flat, byte-position defined, record structure, which is useful for stitching to the DI/ETL processes. Therefore, the physical model has all the physical elements required to define a flat record, which is ONE table with all the elements (including multiple columns for OCCURS elements when the proper bridge parameter is set).
Note that this bridge does not currently support the COPY verb, and reports a parsing error at the line and position at which the COPY statement begins. In order to import Copybooks with the Copy Statement, create an expanded Copybook file with the included sections already in place (replacing the COPY verb). Most COBOL compilers have the option to output only the preprocessed Copybooks with the COPY and REPLACE statements expanded.
Frequently Asked Questions:
Q: Why is the default start column '6' (six) and the default end column '72' (seventy-two)?
A: The bridge parser counts columns starting at 0 (zero), rather than 1 (one). Thus, the defaults leave the standard first six columns for line numbers, next column for comment indicators, and last 8 columns (out of 80) for additional line comment information.
EXCEL (XLSX) FILES
This bridge detects (reverse engineer) the metadata from a data file of type Excel XML format (XLSX).
The detection of such Excel file is based on file extension .XLSX.
The bridge can detect a header row, and use it to create the field name, otherwise generic field names are created.
The bridge samples up to 1000 rows to detect the file data types, such as DATE, NUMBER, STRING.
If an Excel file has multiple sheets, each sheet is imported as the equivalent of a file/table with the same sheet name.
This bridge uses the machine's local file system to read files and allows you to specify the character set encoding files use.
This bridge only imports the CSV aspect of Excel, but does not support any BI / Analysis aspects of Excel like pivot tables, charts, etc.
W3C XML FILES
This W3C XML import bridge is used in conjunction with other file import bridges (e.g. CSV, XLSX, Json, Avro, Parquet) by all data lake / file crawler import bridges (e.g. File systems, Amazon S3, Hadoop HDFS).
The purpose of this XML import is to reverse engineer a model/schema from its content, when such XML was not formally defined by an XML Schema (XSD or DTD).
Such XML files are common from IoT devices uploaded into a data lake.
Nevertheless, such XML files are expected to be fully W3C compliant, especially with respect to the XML text declaration, well-formed parsed entities, and character encoding of entities.
See W3C standards for more details:
https://www.w3.org/TR/xml/#sec-TextDecl
Warning, you must use the dedicated XML based import bridges for all other needs such as:
- other standard W3C XML import bridges (e.g. DTD, XSD, WSDL, OWL/RDL)
- tool specific XML import bridges (e.g. Erwin Data Modeler XML, Informatica PowerCenter XML)
JSON FILES
This bridge imports metadata from JSON files using the Java API.
This bridge loads the entire JSON file using a streaming parser, therefore there are no size limits, although it may take time if it is a remote large JSON file.
This bridge extracts the metadata (JSON hierarchical structure) and detects the following standard JSON data types:
as defined in https://www.json.org/
- String {"stringSample" : "some text", "stringDateSample" : "Thu Apr 06 2017 09:41:51 GMT+0300 (FLE Standard Time)", "expStringSample" : "2.99792458e8"}
- Number {"expNumberSample": 2.99792458E8, "numberSample": 3, "floatSample": 3.141592653589793}
- Array {"arraySample": [1,2,3]}
- True {"booleanSample": true}
- False {"booleanSample": false}
- Null {"nullSample": null}
In addition, the following implementation specific data types are supported:
MongoDB extension:
- The identifier {"_id": {"$oid": "50a9c951300493f64fbffdb6"}}
- Date {"dateExample" : { "$date" : "2014-01-01T05:00:00.000Z"}}
- POSIX date {"isoDateExample" : { "$date" : 1491461103897 }}
- Timestamp {"timestampExample" : { "$timestamp" : { "t" : 1412180887, "i" : 1 } }}
- Number {"numberLongExample": {"$numberLong": "7494814965"}}
CouchDB extension:
- The identifier {"_id":"someId","_rev":"1232343467"}
APACHE AVRO FILES
This bridge imports metadata from Avro files using a Java API.
Note that this bridge is not performing any data driven metadata discovery, but instead reading the schema definition at the header (top) of the Avro file.
This bridge detects the following standard Avro data types:
https://avro.apache.org/docs/current/spec.html#schema_primitive
null - no value.
boolean - a binary value.
int - a 32-bit signed integer.
long - a 64-bit signed integer.
float - a single precision (32 bit) IEEE 754 floating-point number.
double - double precision (64-bit) IEEE 754 floating-point number.
bytes - sequence of 8-bit unsigned bytes.
string - Unicode character sequence.
APACHE PARQUET FILES
This bridge imports metadata from Parquet files using a Java API.
Note that this bridge is not performing any data driven metadata discovery, but instead reading the schema definition at the footer (bottom) of the Parquet file. Therefore, this bridge needs to load the entire Parquet file to reach the schema definition at the end.
If the Parquet file is not compressed, there are no file size limit as the bridge automatically skips the data portion until the footer (although this may take time on large Parquet files). However, if the Parquet file is compressed, then the bridge needs to download the entire file to uncompress it to start with. Therefore, in such case, there is a default file size limit of 10 MB (any bigger files will be ignored), however this limit can be increased in in the Miscellaneous parameter.
This bridge detects the following standard Parquet data types:
as defined in https://parquet.apache.org/documentation/latest
BOOLEAN: 1 bit boolean
INT32: 32 bit signed ints
INT64: 64 bit signed ints
INT96: 96 bit signed ints
FLOAT: IEEE 32-bit floating point values
DOUBLE: IEEE 64-bit floating point values
BYTE_ARRAY: arbitrarily long byte arrays.
APACHE ORC FILES
This bridge imports metadata from ORC files using a Java API.
Note that this bridge is not performing any data driven metadata discovery, but instead reading the schema definition at the header (top) of the ORC file.
This bridge detects the following standard ORC data type:
as defined in https://orc.apache.org/docs/types.html
Integer: boolean (1 bit), tinyint (8 bit), smallint (16 bit), int (32 bit), bigint (64 bit)
Floating point: float, double
String types: string, char, varchar
Binary blobs: binary
Date/time: timestamp, timestamp with local time zone, date
Compound types: struct, list, map, union
MORE INFORMATION
Please refer to the individual parameter's tooltips for more detailed examples.
Bridge Parameters
Parameter Name | Description | Type | Values | Default | Scope | ||
Schema Registry URL | Comma-separated list of URLs for Schema Registry instances that can be used to look up schemas. See https://docs.confluent.io/current/schema-registry/connect.html#configuration-options for details. If you need to import metadata only, you must specify the Schema Registry URL. When the Schema Registry URL is specified, the bridge will import metadata about Topics and their Schemas from the Registry (without involving the Kafka data server). |
STRING | http://localhost:8081 | ||||
Topics | List of topic names, such as 'topic1, topic2'. If list is empty, then all topics are available. You can specify topic names as a wildcard pattern: 'topic?' '*topic*' 'topic_?,*topic*' If you need to import the metadata only, specify the Topics parameter, and leave all other Kafka data connection and sampling configuration parameters empty. |
REPOSITORY_SUBSET | |||||
Bootstrap servers | List of 'host:port' pairs to use for establishing the initial connection to the Kafka cluster, and finding available servers and topics, e.g. 'host1:port1, host2:port2' The list does not need to include all available servers but should have at least one. You may want to include more than one server in case any of them are unavailable. The first entry from this list would be used as a cluster name. |
STRING | localhost:9092 | ||||
Number of sample messages | The maximum number of messages to sample from topics. These messages are used to identify topic format details, like field names and data types. | STRING | 1000 | ||||
Use SSL protocol to connect | Set this parameter to True when the Kafka consumer uses TLS/SSL to encrypt Kafka's network traffic. Kafka uses SSL to encrypt connections between the server and clients |
BOOLEAN |
|
False | |||
Truststore file | Location of the trust store file. If it is empty the import bridge would try to locate it under 'java.home'\lib\security\{'jssecacerts'|'cacerts'} |
FILE | *.* | ||||
Password of the truststore | Password of the truststore. | PASSWORD | |||||
Keystore file | The location of the keystore file. | FILE | *.* | ||||
Password of the keystore | Password of the keystore. | PASSWORD | |||||
Password for the key | Password for the key. | PASSWORD | |||||
JAAS configuration path | Enter the primary part of the Kerberos principal you defined for the brokers when you were creating the broker cluster. For example, in this principal 'kafka/kafka1.hostname.com@EXAMPLE.COM', only the prefix is required for this field, "kafka". This value is populated under the Kafka property: sasl.kerberos.service.name=value |
FILE | *.* | ||||
Kafka brokers principal name | Enter the primary part of the Kerberos principal you defined for the brokers when you were creating the broker cluster. For example, in this principal 'kafka/kafka1.hostname.com@EXAMPLE.COM', only the prefix is required for this field, "kafka". This value is populated under the Kafka property: sasl.kerberos.service.name=value |
STRING | |||||
Kinit command path | Kerberos uses a default path to its Kinit executable. If you have changed this path, enter the custom access path here. Kafka property value - sasl.kerberos.kinit.cmd=value |
STRING | |||||
Kerberos configuration path | Kerberos uses a default path to its configuration file, the krb5.conf file (or krb5.ini in Windows) for Kerberos 5 for example. If you leave this parameter clear, a given methodology is applied by Kerberos to attempt to find the configuration information it requires. For details about this methodology, see the section "Locating the krb5.conf Configuration file" under the Kerberos requirements. This value is going to JVM - '-Djava.security.krb5.conf=value' |
FILE | *.* | ||||
Miscellaneous | INTRODUCTION Specify miscellaneous options starting with a dash and optionally followed by parameters, e.g. -connection.cast MyDatabase1="MICROSOFT SQL SERVER" Some options can be used multiple times if applicable, e.g. -connection.rename NewConnection1=OldConnection1 -connection.rename NewConnection2=OldConnection2; As the list of options can become a long string, it is possible to load it from a file which must be located in ${MODEL_BRIDGE_HOME}\data\MIMB\parameters and have the extension .txt. In such case, all options must be defined within that file as the only value of this parameter, e.g. ETL/Miscellaneous.txt JAVA ENVIRONMENT OPTIONS -java.memory <Java Memory's maximum size> (previously -m) 1G by default on 64bits JRE or as set in conf/conf.properties, e.g. -java.memory 8G -java.memory 8000M -java.parameters <Java Runtime Environment command line options> (previously -j) This option must be the last one in the Miscellaneous parameter as all the text after -java.parameters is passed "as is" to the JRE, e.g. -java.parameters -Dname=value -Xms1G The following option must be set when a proxy is used to access internet (this is critical to access https://repo.maven.apache.org/maven2/ and exceptionally a few other tool sites) in order to download the necessary third-party software libraries. Note: The majority of proxies are concerned with encrypting (HTTPS) the outside (of the company) traffic and trust the inside traffic that can access proxy over HTTP. In this case, an HTTPS request reaches the proxy over HTTP where the proxy HTTPS-encrypts it. -java.parameters -java.parameters -Dhttp.proxyHost=127.0.0.1 -Dhttp.proxyPort=3128 -Dhttp.proxyUser=user -Dhttp.proxyPassword=pass MODEL IMPORT OPTIONS -model.name <model name> Override the model name, e.g. -model.name "My Model Name" -prescript <script name> This option allows running a script before the bridge execution. The script must be located in the bin directory (or as specified with M_SCRIPT_PATH in conf/conf.properties), and have .bat or .sh extension. The script path must not include any parent directory symbol (..). The script should return exit code 0 to indicate success, or another value to indicate failure. For example: -prescript "script.bat arg1 arg2" -postscript <script name> This option allows running a script after successful execution of the bridge. The script must be located in the bin directory (or as specified with M_SCRIPT_PATH in conf/conf.properties), and have .bat or .sh extension. The script path must not include any parent directory symbol (..). The script should return exit code 0 to indicate success, or another value to indicate failure. For example: -postscript "script.bat arg1 arg2" -cache.clear Clears the cache before the import, and therefore will run a full import without incremental harvesting. If the model was not changed and the -cache.clear parameter is not used (incremental harvesting), then a new version will not be created. If the model was not changed and the -cache.clear parameter is set (full source import instead of incremental), then a new version will be created. -backup <directory> This option allows to save the bridge input metadata for further troubleshooting. The provided <directory> must be empty. The primary use of this option is for data store import bridges, in particular JDBC based database import bridges. Note that this option is not operational on some bridges including: - File based import bridges (as such input files can be used instead) - DI/BI repository import bridges (as the tool's repository native backup can be used instead) - Some API based import bridges (e.g. COM based) for technical reasons. DATA CONNECTION OPTIONS Data Connections are produced by the import bridges typically from ETL/DI and BI tools to refer to the source and target data stores they use. These data connections are then used by metadata management tools to connect them (metadata stitching) to their actual data stores (e.g. databases, file system, etc.) in order to produce the full end to end data flow lineage and impact analysis. The name of each data connection is unique by import model. The data connection names used within DI/BI design tools are used when possible, otherwise connection names are generated to be short but meaningful such as the database / schema name, the file system path, or Uniform Resource Identifier (URI). The following option allows to manipulate connections. These options replaces the legacy options -c, -cd, and -cs. -connection.cast ConnectionName=ConnectionType Casts a generic database connection (e.g. ODBC/JDBC) to a precise database type (e.g. ORACLE) for SQL Parsing, e.g. -connection.cast "My Database"="MICROSOFT SQL SERVER". The list of supported data store connection types includes: ACCESS APACHE CASSANDRA DB2/UDB DENODO GOOGLE BIGQUERY HIVE MYSQL NETEZZA ORACLE POSTGRESQL PRESTO REDSHIFT SALESFORCE SAP HANA SNOWFLAKE MICROSOFT SQL AZURE MICROSOFT SQL SERVER SYBASE SQL SERVER SYBASE AS ENTERPRISE TERADATA VECTORWISE HP VERTICA -connection.rename OldConnection=NewConnection Renames an existing connection to a new name, e.g. -connection.rename OldConnectionName=NewConnectionName Multiple existing database connections can be renamed and merged into one new database connection, e.g. -connection.rename MySchema1=MyDatabase -connection.rename MySchema2=MyDatabase -connection.split oldConnection.Schema1=newConnection Splits a database connection into one or multiple database connections. A single database connection can be split into one connection per schema, e.g. -connection.split MyDatabase All database connections can be split into one connection per schema, e.g. -connection.split * A database connection can be explicitly split creating a new database connection by appending a schema name to a database, e.g. -connection.split MyDatabase.schema1=MySchema1 -connection.map SourcePath=DestinationPath Maps a source path to destination path. This is useful for file system connections when different paths points to the same object (directory or file). On Hadoop, a process can write into a CSV file specified with the HDFS full path, but another process reads from a Hive table implemented (external) by the same file specified using a relative path with default file name and extension, e.g. -connection.map /user1/folder=hdfs://host:8020/users/user1/folder/file.csv On Linux, a given directory (or file) like /data can be referred to by multiple symbolic links like /users/john and /users/paul, e.g. -connection.map /data=/users/John -connection.map /data=/users/paul On Windows, a given directory like C:\data can be referred to by multiple network drives like M: and N:, e.g. -connection.map C:\data=M:\ -connection.map C:\data=N:\ -connection.casesensitive ConnectionName Overrides the default case insensitive matching rules for the object identifiers inside the specified connection, provided the detected type of the data store by itself supports this configuration (e.g. Microsoft SQL Server, MySql etc.), e.g. -connection.casesensitive "My Database" -connection.level AggregationLevel Specifies the aggregation level for the external connections, e.g.-connection.level catalog The list of the supported values: server catalog schema (default) KAFKA API OPTIONS -kafka.api.version If it is specified this API version would be used to perform import data samples from Kafka servers. If it is not specified the bridge tries to detect suitable Kafka API version automatically. If the detection fails the default version '2.2.0' would be used. -consumer.group A string that uniquely identifies the group of consumer processes to which this consumer belongs to. By setting the same group ID, multiple processes indicate that they are all part of the same consumer group. This value would be passed into Kafka 'group.ID' property. IMPORT METHODS -import.from.data When utilizing a Kafka server to host Topics that do not reference Schema Registry and some Topics that do, this forces the import bridge to crawl a data lake implemented on Apache Kafka to follow the data route to find topics and use the Schema Registry only when a topic references it. This includes data sampling driven metadata discovery of the data structure (e.g. CSV table, JSON hierarchy, Avro, Parquet) and data types (e.g. Integer, Date, String). However in Kafka, the files are organized in topics instead of partitions. The "Bootstrap servers" parameter must be specified to use this import method. |
STRING |
Bridge Mapping
Mapping information is not available