Use the table importer API to programmatically connect your data warehouse with the Arize platform.
For a brief overview of GraphQL itself, please consult our introduction.
Creating Import Jobs
To create an import job: you must first configure the necessary permissions and access for Snowflake, Databricks, or BigQuery.
Import jobs are localized to a Space. To create an import job, retrieve your space ID from the platform URL:
/organizations/:organization_id/spaces/:space_id.
Import Job Tips:
-
Create a reusable mutation to easily configure multiple table import jobs
-
Validate your table import job before you actually create the job with the
dryRun parameter when creating the import job mutation
- dryRun(boolean): If true, Arize will attempt the import job with no written changes. Use the
validationResult to check validation status
mutation dryRunTableImportJob($input: CreateTableImportJobInput!) {
createTableImportJob(input: $input) {
validationResult {
validationStatus
error {
message
}
}
}
}
variables:{
"input": {
"spaceId": "space_id",
"modelName": "My Test Model",
"modelType": "score_categorical",
"modelEnvironmentName": "production",
"tableStore": "BigQuery",
"bigQueryTableConfig": {
"projectId": "test-project-id",
"dataset": "test-dataset",
"tableName": "test-table"
},
"schema": {
"predictionId": "prediction_id",
"predictionLabel": "prediction",
"timestamp": "prediction_ts",
"features": "feature_",
"actualLabel": "actual",
"version": "model_version",
"tags": "tag_",
"changeTimestamp": "partition_timestamp"
},
"dryRun": true
}
}
Alternatively, you may pass in the inputs directly:mutation {
createTableImportJob(input: {
spaceId: "space_id",
modelName: "My Test Model",
modelType: score_categorical,
modelEnvironmentName: production,
tableStore: BigQuery,
bigQueryTableConfig: {
projectId: "test-project-id",
dataset: "test-dataset",
tableName: "test-table"
},
schema: {
predictionId: "prediction_id",
predictionLabel: "prediction",
timestamp: "prediction_ts",
features: "feature_",
actualLabel: "actual",
version: "model_version",
tags: "tag_",
changeTimestamp: "partition_timestamp"
},
dryRun: true
}){
validationResult {
validationStatus
error {
message
}
}
}
}
mutation createTableImportJob($input: CreateTableImportJobInput!) {
createTableImportJob(input: $input) {
tableImportJob {
id
}
}
}
variables{
"input": {
"spaceId": "space_id",
"modelName": "My Test Model",
"modelType": "score_categorical",
"modelEnvironmentName": "production",
"tableStore": "BigQuery",
"bigQueryTableConfig": {
"projectId": "test-project-id",
"dataset": "test-dataset",
"tableName": "test-table"
},
"schema": {
"predictionId": "prediction_id",
"predictionLabel": "prediction",
"timestamp": "prediction_ts",
"features": "feature_",
"actualLabel": "actual",
"version": "model_version",
"tags": "tag_",
"changeTimestamp": "partition_timestamp"
}
}
}
Alternatively, you may pass in the inputs directly:mutation {
createTableImportJob(input: {
spaceId: "space_id",
modelName: "My Test Model",
modelType: score_categorical,
modelEnvironmentName: production,
tableStore: BigQuery,
bigQueryTableConfig: {
projectId: "test-project-id",
dataset: "test-dataset",
tableName: "test-table"
},
schema: {
predictionId: "prediction_id",
predictionLabel: "prediction",
timestamp: "prediction_ts",
features: "feature_",
actualLabel: "actual",
version: "model_version",
tags: "tag_",
changeTimestamp: "partition_timestamp"
}
}){
tableImportJob {
id
}
}
}
The variables in the example above are for one mapping with BigQuery.Learn how to map your data with each warehouse for Snowflake, Databricks, and BigQuery. Use the GraphQL docs here for a list of all the available fields for a query/mutation.
Querying for Import Jobs
You can query for table import jobs using a Space node.
query {
node(id: "space_id") {
... on Space {
tableJobs(first: 50) {
edges {
node {
id
modelName
jobStatus
totalQueriesFailedCount
totalQueriesSuccessCount
}
}
}
}
}
}
If you have a large number of import jobs, use pagination for the complete list to view or use in your queries/mutations.
Query for fields directly from a specific import job using the TableImportJob node and ID.
To view the queries for a given import job, use the TableImportJobQueryConnection that returns the total number of queries and information on the individual queries.
query {
node(id: "ID") {
... on TableImportJob {
queries(first: 10) {
totalCount
edges{
node {
queryId
createdAt
recordsProcessedCount
windowStartTimestamp
windowEndTimestamp
}
}
}
}
}
}
Pausing Import Jobs
Pause an existing table import job with the pauseTableImportJob mutation and jobID.
mutation {
pauseTableImportJob(input: {jobId: "ID"}) {
tableImportJob {
jobStatus
}
}
}
Resuming Import Jobs
Resume a paused import job by using the startTableImportJob mutation and job ID.
mutation {
startTableImportJob(input: {jobId: "ID"}) {
tableImportJob {
jobStatus
}
}
}
Event-Based Table Import Jobs
Available for Snowflake only.
If you are connecting to a warehouse or table for the first time, please complete the one-time permissions setup. For assistance, contact support@arize.com.
Step 1: Create Table Job
To enable event-driven ingestion, set up the table connection and initialize the job by running the following mutation:
mutation CreateTableJob {
createTriggeredOngoingTableImportJob(input: {
spaceId: "space_id",
modelName: "model-name",
modelType: score_categorical,
modelEnvironmentName: production,
tableStore: Snowflake,
snowflakeTableConfig: {
accountID: "snowflake-account-id",
schema:"snowflake-schema",
database: "database-name",
tableName:"table-name"
},
schema: {
predictionId: "PRED_ID",
predictionLabel: "PRED_LABEL"
timestamp:"TIMESTAMP",
featuresList: [],
tagsList: [],
changeTimestamp: "CHANGE_TIMESTAMP",
}
}) {
tableImportJob{
jobId
jobStatus
}
space{
id
}
validationResult{
error {
code
message
}
validationStatus
}
}
}
Step 2: Query for Table ID
Once the job is created, query for the jobId by running the following query:
query queryTableJobId {
node(id: "spaceId") {
... on Space {
tableJobs(first: 1){
edges{
node{
id
jobId
}
}
}
}
}
}
Step 3: Trigger a query
After the job is created, trigger a query for data within the specified time range using the following mutation:
mutation TriggerTableRun {
createTriggeredOngoingTableRun(input: {
jobId:"<ID from above query>",
queryStart: "2024-08-20T01:00:00Z", #desired start time for query
queryEnd: "2024-08-20T23:00:00Z", #desired end time for query
}) {
clientMutationId
}
}
Deleting Import Jobs
Delete an import job by using the deleteTableImportJob mutation and job ID.
mutation {
deleteTableImportJob(input: {jobId: "ID"}) {
tableImportJob {
jobStatus
}
}
}
Updating Table Ingestion Parameters
Update table ingestion parameters for a job using the updateTableIngestionParameters mutation and job ID.
mutation {
updateTableIngestionParameters(input: {
jobId: "job_id",
tableIngestionParameters: {
refreshIntervalMinutes: 60,
queryWindowSizeHours: 24
}
}) {
tableImportJob {
id
}
}
}
Updating Table Ingestion Schema
Update the schema used for a table import job using the updateTableImportJob mutation and job ID.
mutation updateTableImportJob(
$jobId: ID!,
$schema: TableImportSchemaInputType!,
$tableIngestionParameters: TableIngestionParametersInputType!
) {
updateTableImportJob(input: {
jobId: $jobId,
schema: $schema,
tableIngestionParameters: $tableIngestionParameters
}) {
tableImportJob {
id
}
}
}