Creating an Outbound Integration Pipeline
The basic workflow to create an outbound integration pipeline:
Create an integration pipeline.
Create a pipeline interface.
Set up an integration subscription and execute the pipeline.
View the pipeline execution history.
This page provides the instructions for the first step in this workflow. See the subsequent sections for information on each step. This example covers outbound integration from a CSV file. Users can also use the pipeline to integrate data from JSON and XML files.
Prerequisites for following along with the example shown in each section of this documentation:
Identify the Value Chain ID (VC ID) for your dataset.
Identify the Enterprise & Organization you would like to use. You can either create a HUB4 Enterprise and HUB4 Organization to mimic the sample data and code provided, or use your own and change references to HUB4 within our sample data and code.
Load sample data into the system to use in the outbound pipeline. See the "Loading Sample Data" section for instructions.
Complete the following steps to create a new integration pipeline:
Log in to the ONE system.
Click Menu/Favs > Tools > Integration > NEO Plasma Pipelines.
The NEO Plasma Pipelines screen appears.
Click the + (plus) icon in the top right corner.
The Create NEO Plasma Pipeline popup window appears.
Fill out the following fields. Fields with an asterisk ( * ) are required.
Field
Description
Pipeline Name *
Enter a name for the new pipeline.
Version *
Enter a version number for the new pipeline.
Organization
Use the picker tool to select the organization.
Pipeline Type *
Select the pipeline type from the dropdown menu. The options are Inbound Integration and Outbound Integration. For this example, select Outbound Integration.
Click OK.
A success message appears.Click OK.
The NEO Plasma Pipelines screen updates with the new pipeline listed.Click the pipeline icon (highlighted below) to open the new pipeline.
The new pipeline appears with Outbound Source and Outbound Sink nodes displayed.
Click the Node List icon in the top left corner.
The Node List slideout appears.
Click the + (plus) icon for the type of node you want to add. For this example, we selected a parse node. The parse node parses the inbound file data into records.
The parse node appears in the pipeline.
On the new node, click the icon with three vertical dots and click Properties .
The Node Properties popup window appears.
Click to download a CSV sample file to use in this example. You will upload this sample file in the next step. Alternatively, use the data in the "Sample Data to Create a Schema" section of this guide to create your own CSV file to upload in the next step.
Complete the fields described in the table below. Fields with an asterisk ( * ) are required.
Field
Description
Node Name *
Enter a name for the node.
Format Type *
Select a format type for the inbound file from the dropdown list. Options are JSON, CSV, and XML. The remaining fields vary according to the format chosen. For this example, we selected CSV.
Create Schema
a. Click the Upload Sample File link.
The Create Schema popup appears.
b. Complete the following fields. Fields with an asterisk are required.Field
Description
Namespace *
Enter a unique category to organize schema name fields. For this example, we used Input.
Schema *
Enter a unique name with which to associate the dataset. For this example, we used Sites.
Sample File *
Click the upload icon to upload a sample file for the parse node. For this example, you can use the same CSV file downloaded in the previous step, or create a CSV file using the data in the "Sample Data to Create a Schema" section.
c. Click OK.
The Node Properties updates.View Schema
This field auto-populates with a link once the sample file is uploaded in the Create Schema field. Click the link to view or edit the record schemas.
Click OK.
The pipeline screen updates.On the Outbound Source node, click the red dot next to Output and drag the cursor to connect to the Input on the Parse node.
The connection turns green. The parse node can now parse the data from the outbound source.
Click the node list icon in the top left corner again.
The Node List slideout reappears.Click the + (plus) icon next to Script.
A script node is added to the pipeline.On the new script node, click the icon with three vertical dots and click Properties.
The Node Properties popup window appears.
In the Node Name * field, enter a name for the node. For this example, we used Transform CSV to JSON.
On the Input tab in the Ports section, click the arrow to display the port fields.
In the Port Type * field, select Stream of Record from the dropdown list.
The icon beside the dropdown list becomes active.Click the icon beside the Port Type * dropdown list.
The Record Schemas popup window displays.Click the arrow beside Input.
The list of schemas displays.Select the input schema. For this example, we named the schema Sites.
The schema displays in the pane to the right, and the Activate button becomes active.
Click the Activate button.
Click OK.
The Node Properties popup window appears again.In the Ports section, click the Output tab.
Click the arrow to the left of the output to display the Port Name * and Port Type * fields.
In the Port Type * field, select Stream of Record from the dropdown list.
Click the icon to the right of the Port Type * dropdown list.
In the Schema pane on the left, click the arrow beside Output.
The Output record schemas display in the pane.Click the output schema. For this example, select Sites.
The schema displays in the pain to the right, and the Activate button becomes active.Click the Activate button.
Click OK.
The Node Properties popup window reappears.In the Script * field on the Node Properties popup window, click the pencil icon.
The Edit Script popup window appears.Enter the script code for the node. A sample script code is shown below.
def executeNode(inputs):iterable_inputs = {}outputs = {}# Input portsiterable_inputs["Input"] = inputs["Input"]# Type = stream.record# Address, AlternateAddress1, AlternateAddress2, Contact, CreationDate, CreationUser, Description, DisplayName,# EnterpriseName, ExternalRefNo, LastComputedDate, LastModifiedDate, LastModifiedUser, Latitude, Longitude, Name,# OrganizationName, RunReplenishment, ActivationDate, AuthoritativeLevel, AutoGenBuffer, BarCodeDelimiter,# BarCodeFormat, BarCodePrefixing, BillingContactEmail, BillingContactFax, BillingContactMobile, BillingContactName,# BillingContactPhNum, County, DeactivationDate, HolidayCalendarName, IsBilling, IsDC, IsPlant, IsPrimarySubSite,# IsPublic, IsShipping, IsStore, ManagingOrgEnterpriseName, ManagingOrgName, ParentSiteName,# ParentSiteOrganizationEnterpriseName, ParentSiteOrganizationName, PrimarySubSiteName, ReceivingCalendarName,# ReceivingContactEmail, ReceivingContactFax, ReceivingContactMobile, ReceivingContactName, ReceivingContactPhNum,# ShippingCalendarName, ShippingContactEmail, ShippingContactFax, ShippingContactMobile, ShippingContactName,# ShippingContactPhoneNum, TransportationInstructions, TransSiteGroupName, TransSiteGroupOrganizationEnterpriseName,# TransSiteGroupOrganizationName, ApptSchedulingSystem, ApptSchedulingSystemChangedDate, EnableSoftAppointments,# SiteGroupName, Tier, TimeZoneId, DefaultDockDoorCount, NotifyVASP, TypeName, AllowedDetentionTime# US~State~City~Address_ent~Address_org~Address_site~Street1~Street2~Street3~Zip~Latitude~Longitude~Time_zoneaddress_components = {'State':1,'City':2,'Street1':6,'Street2':7,'Zip':9,}# Add node logic hereforrecord in iterable_inputs["Input"]:o_record = {}o_record['Name'] = record['Name']o_record['Organization'] = record['OrganizationName']o_record['Enterprise'] = record['EnterpriseName']o_record['Contact'] = record['Contact']o_record['Address'] = parse_addr(record['Address'], address_components)yield {"Output": o_record }# Activate and set outputs (omit a port to prevent execution of nodes that depend on that port)yield {"Output": None }# Type = stream.record# Name, Organization, Enterprise, Contact, Addressreturnoutputsdef parse_addr(addr, address_components):comps = addr.split('~')result = {}forkey in address_components:result[key] = comps[address_components[key]]returnresultClick Save.
The Node Properties popup window appears.Click Save.
The pipeline screen reappears.On the Parse node, click the red dot next to Parsed and drag the cursor to connect to Input 1 on the script node.
The connection turns green. The stream of records from the parser has now been converted to the correct format.
Click the node list icon in the top left corner again.
The Node List slideout reappears.Click the + (plus) icon next to Format.
A format node is added to the pipeline.On the new Format node, click the icon with three vertical dots and click Properties.
The Node Properties popup window appears.
Complete the following fields. Fields with an asterisk ( * ) are required.
Field
Description
Node Name *
Enter a name for the node. In this example, we used Format JSON.
Format Type *
Select the desired format type from the dropdown list. For this example, we used JSON. Note that the remaining fields change based on the format type selected.
Template
Click the pencil icon.
The Edit Script popup appears.In the Edit Current Code field, enter the script. For this example, we used the following script:
{"sites": [{%forrecord in records %}{% filter ppjson|indent(first=True) %}{{ record|tojson }}{% endfilter %}{%ifnot loop.last %}{{',\n'}}{% endif %}{% endfor +%}]}
Schema *
Select the output schema from the dropdown list. For this example, we selected Output/Sites.
Click OK.
The pipeline screen reappears.On the Format node, click the red dot next to Output and drag the cursor to connect to File on the Outbound Sink node.
Repeat this process to add additional nodes as desired. The following node types are available:
Parse
Format
Collect Records
Normalize
Sort
Script
Click the Save button to save the pipeline.
Click the Run Test icon in the top right corner to test the pipeline.
The Test Run Pipeline slideout appears.
Click the upload icon to upload the inbound source file.
Click the Run Test button.
The Node Logs section appears at the bottom of the screen with the successful nodes turning green in the Execution Sequence column.
If the test run is correct, the next step is to create a pipeline interface. See the "Creating a Pipeline Interface" section for instructions.