Skip to main content
Skip table of contents

Lesson - Parallel Processing Fixed Width Messages

In this lesson, you continue the exploration of the Integration Engine. You will develop a system that receives a large Fixed message and uses asynchronous features of the NexJ Framework to process multiple portions of that data concurrently.

On completing this lesson, participants will:

  • have gained experience working with the Fixed message format, object queue and file channels, and integration services.
  • be able to describe the purpose of the fixed-width message format

Key Concepts

When a Fixed file is parsed, it is consumed by the Integration Engine as a single message. The entire data of a received Fixed message is parsed before the message is passed to the next stage of the integration flow, which is typically the next step of an integration service. Fixed messages may be paged during parsing, allowing for extremely large messages to be consumed.

Though this approach allows for very large volumes of data to be processed in the very straight forward approach that other smaller messages follow, this approach may not be suitable for a given business solution because:

  1. this approach involves paging the entire incoming message to temporary disk space, approximately doubling the free disk space requirements when receiving such a message, and
  2. this approach is serial, meaning it is operated on by only one thread.

If the processing of the rows of data within a Fixed message (the records part) is not dependent on the order of those rows, then the nature of a Fixed message definition allows the processing to operate on different parts of the message concurrently. That is, we can parallelize the processing of the extremely large volume of data by processing it in chunks.

To do this, we leverage the asynchronous capabilities of the Integration layer. The basic algorithm we will develop is:

  1. A function analyses the raw file (message) size, calculates the size and offsets of each chunk, and sends these chunk values in messages to a message queue.
  2. A second service is configured to operate on the message queue. That service reads a message and uses the chunk values to stream just that chunk of data from the very large text file, parsing the chunk and the operating on the rows of that chunk.

Since the message queue has multiple receivers, the chunk processing runs concurrently, once for each receiver. And we will choose chunk sizes in such as way that the parsing can happen in memory without need for paging. This approach alleviates the two concerns highlighted from the standard Integration approach when it comes to handling exteremely large data.

Define The Fixed Message

In this lab, you require a fixed message definition for the integration message the system will receive.

  1. In the Integration > Messages tab, right-click and select New Message. In the Name field, enter LegacyCustomerFixed and click Finish.
  2. In the Overview tab, select the LegacyCustomerFixed message root. Set the Format field to Fixed.
  3. Right-click the root node of the LegacyCustomerFixed message and select Insert Child > Message.
  4. In the Properties view for this new message node, set:
    • Name: rows
    • minCount: 0
    • maxCount: 0
      Note: A maxCount of 0 means a collection of unlimited size.
  5. Right-click the rows node and select Insert Child > Value.
  6. In the Properties view for this new message node, set:
    • Name: customerId
    • Type: string
    • minCount: 1
  7. For the customerId node, click the Create Mapping button and set:
    • Width: 10

Repeat steps 5-7 for these additional fields:

NameTypeminCountWidth
firstNamestring130
lastNamestring130
birthDatestring111
statusCodestring14

Generating Test Data File

Using the message file defined above, you will generate a large volume of test data.

  1. In the Resources > Scratchpads tab, right-click and select New Scratchpad. In the Name field, enter GenerateTestLegacyData and click Finish.
  2. Enter the following code into the scratchpad:

    CODE
    ;;;;----------------
    ;;;; Code to generate a test file for LegacyCustomerFixed.message
    ;;;;
    (import 'java.io.BufferedWriter)
    (import 'java.io.OutputStreamWriter)
    (import 'java.io.File)
    (import 'java.io.FileOutputStream)
    (import 'java.util.Random)
    
    (define cal (java.util.Calendar'instance))
    
    (define rand (java.util.Random'new ((now)'time)))
    
    (define statii #("PROS" "CLNT" "JONT" "HHLD" "FRMR" "UNKN" "XXXX"))
    
    (define statii-len (vector-length statii))
    
    ; Returns a random timestamp between Jan 1 1910 and Dec 31 2009
    (define (get-random-date)
       (cal'set (cal'YEAR) (+ 1910 (rand'nextInt 100)))
       (cal'set (cal'MONTH) (rand'nextInt 12))               ; first month is 0
       (cal'set (cal'DAY_OF_MONTH) (+ 1 (rand'nextInt 31)))  ; Calendar handles "Feb 31"
       ; return
       (cast timestamp ((cal'time)'time))
    )
    
    ; Returns a random 4-character "status" string (statuses herein are fictitious)
    (define (get-random-status)
       (vector-ref statii (rand'nextInt statii-len))
    )
    
    ; Generates a LegacyCustomerFixed message file filled with records of test data
    ; @arg numMessages long The number of records to generate
    ; @arg filePath string The full path to the file to be created
    ; @ret none
    ; @example
    ; ; create 20M records -- takes approximately 10 minutes (~35000 recs/sec)
    ; > (generate-test-data 20000000 "c:/tmp/legacy.txt")
    ;
    (define (generate-test-data numMessages filePath)
       (let*
          (
             (file (java.io.File'new filePath))
             (out
                (java.io.BufferedWriter'new
                   (java.io.OutputStreamWriter'new
                      (java.io.FileOutputStream'new file)
                   )
                )
             )
             (fmtPattern
                (string-append
                   "id{0,number,00000000}"                      ; customerId
                   "fn{0,number,0000000000000000000000000000}"  ; firstName
                   "ln{0,number,0000000000000000000000000000}"  ; lastName
                   "{1,date,MMM dd yyyy}"                       ; birthDate
                )
             )
          )
    
          (for
             ((i 0))            ; pre
             (< i numMessages)  ; test
             (set! i (+ i 1))   ; post
    
             ; body of for loop
             (out'write (format fmtPattern i (get-random-date)))
             (out'write (get-random-status))
             (if (= 0 (modulo i 50000)) (logger'info i "records written"))
          )
    
          (out'close)
          (logger'info "Wrote" numMessages "messages.  Size of" filePath
             "is" (file'length) "bytes")
       )
    )
  3. Save the file.
  4. Start a Minimal Scheme Console.
  5. Execute the code from the scratchpad in the console. This creates all the supporting functions.
  6. Now execute the following code to actually generate the test data file. Modify the parameters to meet your needs:

    CODE
    (generate-test-data 20000000 "c:/tmp/legacy.txt")
    With the above parameters, the generated message file is 1.6GB in size and will take approximately 10 minutes to generate.

Process The Test Data Serially (with Paging)

Typical message processing via integration involves having the system consume the message as a whole. In this section, you will process the test data file serially as a single message so that it can be compared with the parallelized approach taken in the remaining sections.

First, create a version of the fixed message that includes paging. Without paging, the system use in excess of 1.6GB of memory simply to load the data prior to processing. Paging allows the system to load the entire message, but pages the data back to disk rather than consuming massive amounts of heap space (memory).

  1. In the Integration > Messages tab, right-click LegacyCustomerFixed, select Create Copy... and name the new message LegacyCustomerFixed_paged.
  2. To edit this new message, right-click LegacyCustomerFixed_paged and select Open.
  3. In the Overview tab, expand the LegacyCustomerFixed_paged message root.
  4. Click the rows node, then click the Create Mapping button.
  5. Set the Page Size parameter to 2000. This causes the system to page data (write it to a temporary file on disk) after every 2000 records that have been read from the message file.
  6. Save your changes.

Next, in a scratchpad define a function that will open the data file, manually parse the data and log the data from every N-th row:

  1. In the Resources > Scratchpads tab, right-click and select New Scratchpad. In the Name field, enter FixedProcessSerially and click Finish.
  2. Enter the following code into the scratchpad:

    SCHEME
    ; @arg fixedMsgName string The name of the fixed message
    ; @arg recordsName string The name of the collection part of the fixed message
    ; @arg filePath string The full path to the file to be created
    ; @arg printEvery long Log information each time this number of records are processed
    ; @return The number of records received in the parsed message
    ; @example 
    ; > (fixed-process-serially "LegacyCustomerFixed_paged" "rows" "c:/tmp/legacy.txt" 100000L)
    ; 
    (define (fixed-process-serially fixedMsgName recordsName filePath printEvery)
       (let*
          (
             (inputStream
                (nexj.core.integration.io.StreamInput'new
                   (java.io.FileInputStream'new
                      (java.io.File'new filePath)
                   )
                )
             )
             (parseResult ())
             (metadata ((invocation-context)'metadata))
             (messageMeta (metadata'getMessage fixedMsgName))
             (messageParser (((messageMeta'format)'parser)'getInstance '()))
             (numRecords 0)
          )
    
          (logger'info "Start parse")
    
          (set! parseResult (messageParser'parse inputStream messageMeta))
          (set! numRecords ((parseResult'rows)'size))
    
          (logger'info "Done parse! Received" numRecords "records")
    
          (for
             (                               ; pre
                (i 0)
                (printRow ())
             )
             (< i numRecords)                ; test
             (begin                          ; post
                (set! i (+ i printEvery))
             )
    
             ; body of for loop
             (set! printRow ((parseResult'rows) i))
    
             (logger'info
                "serial - last record: customerId<" (printRow'customerId)
                "> birthDate<" (printRow'birthDate)
                "> statusCode<" (printRow'statusCode)
                ">"
             )
          )
    
          ; return
          numRecords
       )
    )
  3. Save the file.
  4. Start a Scheme Server Console.
    Note: The serial processing can be executed in a Scheme Minimal Console. However, the parallel processing you will do in later sections do require server facilities. Use the Server console for both to make comparisons of both approaches more consistent.
  5. Execute the code from the scratchpad in the console. This creates the function.
  6. Now execute the following code in the Scheme Console to actually generate the test data file. Modify the parameters to meet your needs:

    SCHEME
    (fixed-process-serially "LegacyCustomerFixed_paged" "rows" "c:/tmp/legacy.txt" 100000L)
    With the above parameters, the data is processed while logging every 100,000th record. This process will take approximately 12 minutes.

Setting Up For Chunking

In this section you will process the same large amount of data but do so using the asynchronous mechanism of message queues. A function will send "chunk" messages to the message queue, and a service bound to that message queue will receive a chunk message and process that chunk of the data file. Because the message queue has multiple receivers configured, multiple chunks will be processed concurrently.

  1. Create the Message Queue where the chunk messages will be sent.
    1. In the Integration > Channels tab, right-click and select New Channel.
    2. In the Name field, enter FixedChunk.
    3. Click Finish.
    4. In the new message, set the Alias field to FIXED_CHUNK_QUEUE.
    5. Save your changes.
  2. Define the message structure for communicating chunk offsets.
    1. In the Integration > Messages tab, right-click and select New Message. In the Name field, enter FixedChunk and click Finish.
    2. Right-click the root node of the FixedChunk message and select Insert Child > Value.
    3. In the Properties view for this new message node, set:
      • Name: messageName
      • minCount: 1
        Note: A minCount of 1 means it is a required value.
      • Type: string
    4. Repeat steps 2-3 for these additional fields:

      NameTypeminCount
      recordsNamestring1
      filePathstring1
      firstByteOfChunkstring1
      numBytesPerChunkstring1
  3. Define the service that will be bound to the message queue for processing the chunk messages.
    1. In the Integration > Services tab, right-click and select New Service.
    2. In the Name field, enter FixedChunkProcessService.
    3. Click Finish.
    4. In the new service, add a Script step with the following properties:
      • Name: Extract body
      • Script: (this'body)
    5. Add a second Script step after Extract body with the following properties:
      • Name: Parse chunk
      • Script:

        SCHEME
        (import 'nexj.core.util.LimitInputStream)
        
        (let*
           (
              (inputStream (file-get-input-stream (this'filePath)))
              (limitInputStream ())
              (streamInput ())
              (parseResult ())
              (bytesSkipped (inputStream'skip (this'firstByteOfChunk))) 
              (metadata ((invocation-context)'metadata))
              (messageMeta (metadata'getMessage (this'messageName)))
              (messageParser (((messageMeta'format)'parser)'getInstance '()))
           )
        
           (if (< bytesSkipped (this'firstByteOfChunk))
              (error "Number of bytes skipped, {0}, less than expected, {1}."
                 bytesSkipped (this'firstByteOfChunk))
           )
        
           (set! limitInputStream
              (nexj.core.util.LimitInputStream'new inputStream (this'numBytesPerChunk) #f))
           (set! streamInput (nexj.core.integration.io.StreamInput'new limitInputStream))
           (set! parseResult (messageParser'parse streamInput messageMeta))
           
           (limitInputStream'close)
           parseResult
        )
    1. Add a Log step after Parse chunk with the following properties:
      • Name: log
      • Level:
        "info"
      • Id:
        "chunk - last record: customerId<{0}> birthDate<{1}> statusCode<{2}>"
      • Arguments:
        (((this'rows) (- ((this'rows)'size) 1))'customerId)
        (((this'rows) (- ((this'rows)'size) 1))'birthDate)
        (((this'rows) (- ((this'rows)'size) 1))'statusCode)
    2. Save your changes.
  1. Bind the service to the message queue.
    1. Open the FixedChunkQueue channel.
    2. In the Service Bindings tab, click the Add button.
    3. For the new service binding, set the Service to FixedChunkProcessService.
    4. Save your changes.
  2. Add the message queue to your environment.
    1. In the Deployment > Environments tab,open the Development environment .
    2. In the Channel Connections tab, click the Select button.
    3. In the Select Channel Connections dialog, in the left column select FixedChunkQueue and click the Add button.
    4. Click OK.
    5. Save your changes.
  3. Add the function that generates chunk messages.
    1. In the Resources > Libraries tab, right-click and select New Library. In the Name field, enter FixedProcessInChunks and click Finish.
    2. Enter the following code into the scratchpad:

      SCHEME
      (declare scope server)
      
      ; Calculates the parameters of each chunk and sends them as a FixedChunk message
      ; to a queue for asynchronous processing
      ; @arg fixedMsgName string The name of the fixed-width message definition
      ; @arg collectionName string The name of the collection part that holds the records
      ; @arg filePath string The path to the file
      ; @arg numRecordsPerChunk long The number of records to process per chunk
      ; @arg queueName string The name of the message queue we send FixedChunk messages to
      ; @return none.
      ; @example
      ; > (fixed-process-in-chunks "LegacyCustomerFixed" "rows" "c:/tmp/legacy.txt"
      ;      100000 "FixedChunkQueue")
      ;
      (define (fixed-process-in-chunks fixedMsgName collectionName filePath
                 numRecordsPerChunk queueName)
         (let*
            (
               (fileLen (file-get-length filePath))
               (recSize (fixed-record-width fixedMsgName collectionName))
               (chunkBytes (* numRecordsPerChunk recSize))
               (chunkCount 0)
            )
      
            (logger'info "File length is" fileLen "bytes")
            (logger'info "Record size is" recSize "bytes")
            (logger'info "Chunk size is" chunkBytes "bytes")
      
            ; make sure the fixed message is chunkable
            (if (not (= 0 ((fixed-message-prefix fixedMsgName)'length)))
               (error "Prefix of message {0} must be null in order to chunk" fixedMsgName)
            )
            (if (not (= 0 (fixed-record-pagesize fixedMsgName collectionName)))
               (error "Page size of records part {1} of {0} must be 0 in order to chunk"
                  fixedMsgName collectionName)
            )
      
            (for ((i 0))            ; pre
                 (< i fileLen)      ; test
                 (begin (set! i (+ i chunkBytes)) (set! chunkCount (+ chunkCount 1))) ; post
      
               ; body
               (integration-send-receive
                  (message
                     (: :class "MessageQueue")
                     (: persistent #t)
                     (: body
                        (message
                           (: :class "FixedChunk")
                           (: messageName fixedMsgName)
                           (: recordsName collectionName)
                           (: filePath filePath)
                           (: firstByteOfChunk i)
                           (: numBytesPerChunk chunkBytes)
                        )
                     )
                  )
                  queueName
                  ()
               )
            )
      
            (logger'info "Generated " chunkCount "messages to " queueName "queue")
         )
      )
      Save the file.

Process the Test Data Concurrently

In this section, you will process the test data file concurrently.

  1. Start a Scheme Server Console. You need to use the Server Console as the server's facilities are used to manage the message queues.
  2. Now execute the following code in the Scheme Console to actually generate the test data file. Modify the parameters to meet your needs:

    SCHEME
    (fixed-process-in-chunks "LegacyCustomerFixed" "rows" "c:/tmp/legacy.txt"
       100000 "FixedChunkQueue")

With the above parameters, the data is processed in chunks of 100,000 records. The service logs the last record of its chunk, so the output is similar to that of the serialized processing. This process will take approximately 1 minute to complete, though control will return to the caller (the Scheme console) as soon as the chunk messages have all been sent to the message queue. The concurrent processing of those messages occurs independently of the caller.

Conclusion

n this chapter you have leveraged the asynchronous capabilities of the Integration engine via message queues.

Additional enhancements to this approach include:

  • Provide different processing of the records other than simply logging, such as creating class instances and commiting them to a data source.
  • Tracking each chunk as it is consumed, providing statistics and auditing of the process's progress.
  • Synchronizing consumption of the chunks, possibly using a SysCounter, to enable cleaning up after all chunks are consumed such as deleting the data file.
  • Using an Object Queue in place of a Message Queue, providing greater control and ability to track progress and order of chunk consumption.


JavaScript errors detected

Please note, these errors can depend on your browser setup.

If this problem persists, please contact our support.