Wednesday, 9 July 2014

Create SSIS Package programatically using SSIS API in c#

Hi friends,

Today I am going to show you, how you can create a SSIS package programmatically in C#.
Before that, let's discuss first problem statement or scenario where we might need this kind of solution.

Problem Statement: I need to extract data from multiple tables in a database and save that extracted data to a separate database for the purpose of read-only use (for example Reporting purpose). We need to do this because we want to avoid load on OLTP system. Requirement is to automate this process, so any time I should be able to create a new table (based on new requirement coming) and load/update data in that table without writing code each time.

Solution: Here I created a table(as per below mentioned table schema) programmatically in database now next step to load data on this table.

CREATE TABLE [dbo].[TestTableDest]
(
    [ProductID] [int] IDENTITY(1,1) NOT NULL,
    [Name] [varchar](50) NOT NULL,
    [ProductNumber] [varchar](50) NOT NULL,
 CONSTRAINT [IX_TestTableDest] UNIQUE NONCLUSTERED
(
    [ProductID] ASC
)WITH (PAD_INDEX  = OFF, STATISTICS_NORECOMPUTE  = OFF, IGNORE_DUP_KEY = ON, ALLOW_ROW_LOCKS  = ON, ALLOW_PAGE_LOCKS  = ON) ON [PRIMARY]
) ON [PRIMARY]


 Now let's create SSIS package to load data to this table programmatically using C#. Requirement to insert new rows (if it is new) or update the existing row (if it is already exists) based on unique key value.
To do this we need below assemble reference to our project.

1. Microsoft.SqlServer.ADONETSrc
         Location: C:\Program Files (x86)\Microsoft SQL Server\100\DTS\PipelineComponents\Microsoft.SqlServer.ADONETSrc.dll
2. Microsoft.SqlServer.DTSPipelineWrap
         Location: C:\Program Files (x86)\Microsoft SQL Server\100\SDK\Assemblies\Microsoft.SqlServer.DTSPipelineWrap.dll
3. Microsoft.SqlServer.DTSRuntimeWrap
         Location: C:\Program Files (x86)\Microsoft SQL Server\100\SDK\Assemblies\Microsoft.SQLServer.DTSRuntimeWrap.dll
4. Microsoft.SqlServer.ManagedDTS
         Location: C:\Program Files (x86)\Microsoft SQL Server\100\SDK\Assemblies\Microsoft.SQLServer.ManagedDTS.dll
5. Microsoft.SqlServer.PipelineHost
         Location: C:\Program Files (x86)\Microsoft SQL Server\100\SDK\Assemblies\Microsoft.SqlServer.PipelineHost.dll

And then below is the code which will do our task as defined here.




using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Xml;

namespace IDS.DynamicSSIS
{
    class Program3
    {
        static void Main(string[] args)
        {
            Application app = new Application();
            Package package = new Package();

            #region Add Data Flow Task
            Executable dataFlowTask = package.Executables.Add("STOCK:PipelineTask");

            // Set the name (otherwise it will be a random GUID value)
            TaskHost taskHost = dataFlowTask as TaskHost;
            taskHost.Name = "Data Flow Task";

            // We need a reference to the InnerObject to add items to the data flow
            MainPipe pipeline = taskHost.InnerObject as MainPipe;
            #endregion Add Data Flow Task

            #region Add connection manager
            ConnectionManager connection = package.Connections.Add("OLEDB");
            connection.Name = "localhost";
            connection.ConnectionString = "Data Source=localhost;Initial Catalog=TestDB;Provider=SQLNCLI10.1;Integrated Security=SSPI;Auto Translate=False;";
            #endregion Add connection manager

            #region Add OLE DB Source Data Flow
            // Add OLEDB Source
            IDTSComponentMetaData100 srcComponent = pipeline.ComponentMetaDataCollection.New();
            srcComponent.ComponentClassID = "DTSAdapter.OleDbSource";
            srcComponent.ValidateExternalMetadata = true;
            IDTSDesigntimeComponent100 srcDesignTimeComponent = srcComponent.Instantiate();
            srcDesignTimeComponent.ProvideComponentProperties();
            srcComponent.Name = "OleDb Source";

            // Configure it to read from the given table
            srcDesignTimeComponent.SetComponentProperty("AccessMode", 2);// 2 - SQL Command
            srcDesignTimeComponent.SetComponentProperty("SqlCommand", "Select * from dbo.TestTableSource");

            // Set the connection manager
            srcComponent.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(connection);
            srcComponent.RuntimeConnectionCollection[0].ConnectionManagerID = connection.ID;

            // Retrieve the column metadata
            srcDesignTimeComponent.AcquireConnections(null);
            srcDesignTimeComponent.ReinitializeMetaData();
            srcDesignTimeComponent.ReleaseConnections();

            IDTSOutputColumnCollection100 sourceColumns = srcComponent.OutputCollection[0].OutputColumnCollection;

            #endregion Add OLE DB Source Data Flow

            #region lookup transform

            // Add transform
            IDTSComponentMetaData100 lookupComponent = pipeline.ComponentMetaDataCollection.New();
            lookupComponent.ComponentClassID = "DTSTransform.Lookup";
            lookupComponent.Name = "Lookup";

            CManagedComponentWrapper lookupWrapper = lookupComponent.Instantiate();
            lookupWrapper.ProvideComponentProperties();

            // Connect the source and the transform
            IDTSPath100 lookUpPath = pipeline.PathCollection.New();
            lookUpPath.AttachPathAndPropagateNotifications(srcComponent.OutputCollection[0], lookupComponent.InputCollection[0]);


            // Set the connection manager
            lookupComponent.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(connection);
            lookupComponent.RuntimeConnectionCollection[0].ConnectionManagerID = connection.ID;

            // Cache Type - Full = 0, Partial = 1, None = 2
            lookupWrapper.SetComponentProperty("CacheType", 0);
            lookupWrapper.SetComponentProperty("NoMatchBehavior", 1);// 1= Redirect rows to No Match output
            lookupWrapper.SetComponentProperty("SqlCommand", "select Name, ProductNumber from [dbo].[TestTableDest]");

            // initialize metadata
            lookupWrapper.AcquireConnections(null);
            lookupWrapper.ReinitializeMetaData();
            lookupWrapper.ReleaseConnections();

            // Mark the columns we are joining on
            IDTSInput100 lookupInput = lookupComponent.InputCollection[0];
            IDTSInputColumnCollection100 lookupInputColumns = lookupInput.InputColumnCollection;
            IDTSVirtualInput100 lookupVirtualInput = lookupInput.GetVirtualInput();
            IDTSVirtualInputColumnCollection100 lookupVirtualInputColumns = lookupVirtualInput.VirtualInputColumnCollection;

            // Note: join columns should be marked as READONLY
            var joinColumns = new string[] { "Name" };
            foreach (string columnName in joinColumns)
            {
                IDTSVirtualInputColumn100 virtualColumn = lookupVirtualInputColumns[columnName];
                IDTSInputColumn100 inputColumn = lookupWrapper.SetUsageType(lookupInput.ID, lookupVirtualInput, virtualColumn.LineageID, DTSUsageType.UT_READONLY);
                lookupWrapper.SetInputColumnProperty(lookupInput.ID, inputColumn.ID, "JoinToReferenceColumn", columnName);
            }

            // First output is the Match output
            IDTSOutput100 lookupMatchOutput = lookupComponent.OutputCollection[0];

            // Second output is the Un-Match output
            IDTSOutput100 lookupNoMatchOutput = lookupComponent.OutputCollection[1];

            #endregion lookup transform

            #region Add OLE DB Destination Data Flow for No Match Output
            // Add OLEDB Source
            IDTSComponentMetaData100 destNoMatchComponent = pipeline.ComponentMetaDataCollection.New();
            destNoMatchComponent.ComponentClassID = "DTSAdapter.OleDbDestination";
            destNoMatchComponent.ValidateExternalMetadata = true;

            IDTSDesigntimeComponent100 destNoMatchDesignTimeComponent = destNoMatchComponent.Instantiate();
            destNoMatchDesignTimeComponent.ProvideComponentProperties();
            destNoMatchComponent.Name = "OleDb Destination";

            // Configure it to read from the given table
            destNoMatchDesignTimeComponent.SetComponentProperty("AccessMode", 3);// 3 - OpenRowset
            destNoMatchDesignTimeComponent.SetComponentProperty("OpenRowset", "[dbo].[TestTableDest]");

            // Set the connection manager
            destNoMatchComponent.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(connection);
            destNoMatchComponent.RuntimeConnectionCollection[0].ConnectionManagerID = connection.ID;

            // Retrieve the column metadata
            destNoMatchDesignTimeComponent.AcquireConnections(null);
            destNoMatchDesignTimeComponent.ReinitializeMetaData();
            destNoMatchDesignTimeComponent.ReleaseConnections();

            #endregion Add OLE DB Destination Data Flow for No Match Output

            #region Add OLE DB Command Data Flow for Matching Output
            // Add OLEDB Source
            IDTSComponentMetaData100 destMatchComponent = pipeline.ComponentMetaDataCollection.New();
            destMatchComponent.ComponentClassID = "DTSTransform.OLEDBCommand";
            destMatchComponent.ValidateExternalMetadata = true;

            IDTSDesigntimeComponent100 destMatchDesignTimeComponent = destMatchComponent.Instantiate();
            destMatchDesignTimeComponent.ProvideComponentProperties();
            destMatchComponent.Name = "OleDb Command";

            // Configure it to read from the given table
            destMatchDesignTimeComponent.SetComponentProperty("CommandTimeout", 0);
            destMatchDesignTimeComponent.SetComponentProperty("SqlCommand", "UPDATE [dbo].[TestTableDest] SET [ProductNumber] = ? WHERE [Name] = ?");

            // Set the connection manager
            destMatchComponent.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(connection);
            destMatchComponent.RuntimeConnectionCollection[0].ConnectionManagerID = connection.ID;

            // Retrieve the column metadata
            destMatchDesignTimeComponent.AcquireConnections(null);
            destMatchDesignTimeComponent.ReinitializeMetaData();
            destMatchDesignTimeComponent.ReleaseConnections();

            #endregion Add OLE DB Command Data Flow for No Match Output

            #region Connect source and destination for No Match Outputs
            IDTSPath100 noMatchPath = pipeline.PathCollection.New();
            noMatchPath.AttachPathAndPropagateNotifications(lookupNoMatchOutput, destNoMatchComponent.InputCollection[0]);

            // Configure the destination
            IDTSInput100 destNoMatchInput = destNoMatchComponent.InputCollection[0];
            IDTSVirtualInput100 destNoMatchVirInput = destNoMatchInput.GetVirtualInput();
            IDTSInputColumnCollection100 destNoMatchInputCols = destNoMatchInput.InputColumnCollection;
            IDTSExternalMetadataColumnCollection100 destNoMatchExtCols = destNoMatchInput.ExternalMetadataColumnCollection;

            // The OLEDB destination requires you to hook up the external columns
            foreach (IDTSOutputColumn100 outputCol in sourceColumns)
            {
                // Get the external column id
                IDTSExternalMetadataColumn100 extCol = (IDTSExternalMetadataColumn100)destNoMatchExtCols[outputCol.Name];
                if (extCol != null)
                {
                    // Create an input column from an output col of previous component.
                    destNoMatchVirInput.SetUsageType(outputCol.ID, DTSUsageType.UT_READONLY);
                    IDTSInputColumn100 inputCol = destNoMatchInputCols.GetInputColumnByLineageID(outputCol.ID);
                    if (inputCol != null)
                    {
                        // map the input column with an external metadata column
                        destNoMatchDesignTimeComponent.MapInputColumn(destNoMatchInput.ID, inputCol.ID, extCol.ID);
                    }
                }
            }

            #endregion Connect source and destination for No Match Outputs

            #region Connect source and destination for Matching Outputs
            IDTSPath100 matchPath = pipeline.PathCollection.New();
            matchPath.AttachPathAndPropagateNotifications(lookupMatchOutput, destMatchComponent.InputCollection[0]);

            // Configure the destination
            IDTSInput100 destMatchInput = destMatchComponent.InputCollection[0];
            IDTSVirtualInput100 destMatchVirInput = destMatchInput.GetVirtualInput();
            IDTSInputColumnCollection100 destMatchInputCols = destMatchInput.InputColumnCollection;
            IDTSExternalMetadataColumnCollection100 destMatchExtCols = destMatchInput.ExternalMetadataColumnCollection;

            // The OLEDB destination requires you to hook up the external columns
            Dictionary<string, string> parameters = new Dictionary<string, string>();
            parameters.Add("Param_0", "ProductNumber");
            parameters.Add("Param_1", "Name");

            foreach (KeyValuePair<string, string> paramValue in parameters)
            {
                // Get the external column id
                IDTSExternalMetadataColumn100 extCol = (IDTSExternalMetadataColumn100)destMatchExtCols[paramValue.Key];
                if (extCol != null)
                {
                    foreach (IDTSOutputColumn100 outputCol in sourceColumns)
                    {
                        if (outputCol.Name == paramValue.Value)
                        {
                            // Create an input column from an output col of previous component.
                            destMatchVirInput.SetUsageType(outputCol.ID, DTSUsageType.UT_READONLY);
                            IDTSInputColumn100 inputCol = destMatchInputCols.GetInputColumnByLineageID(outputCol.ID);
                            if (inputCol != null)
                            {
                                // map the input column with an external metadata column
                                destMatchDesignTimeComponent.MapInputColumn(destMatchInput.ID, inputCol.ID, extCol.ID);
                            }
                        }
                    }
                }
            }

            #endregion Connect source and destination for No Match Outputs

            #region save package as xml
            string packageXML = @"C:\ORCC\POC\pkg1.dtsx";
            XmlDocument myPkgDocument = new XmlDocument();
            package.SaveToXML(ref myPkgDocument, null, null);
            package.SaveToXML(ref myPkgDocument, null, null);
            app.SaveToXml(packageXML, package, null);
            #endregion save package as xml

            package.Execute();

            Console.WriteLine("Completed");
            Console.ReadLine();
        }
    }
}


 



Now let me explain (in steps) what I have done here.
 
Step 1:Created a Data Flow Task. Written within "Add Data Flow Task" region.
 
Step 2: Creating connection manager to connect to source and destination database. To make example simple I used same database as a source as well as destination. Written within "Add connection manager" region.
 
Step 3: Created Oledb Source Data Flow control which will extract the data from the source location. Written within "Add OLE DB Source Data Flow" region.
 
Step 4: Created a lookup data flow control which will help to identify the new row and updated row by looking to destination database. Written within "lookup transform" region.
Here we need to mention which column you want to look in destination to identify rows. in my case I am using 'Name' column (the column which will be used here must be unique).
As a result lookup will give result as 'Match Column'(rows already exists in destination and need update) and 'No Match Column'(new rows and need insert). 

Step 5: Next I added a Oledb Destination Data flow control for No Match lookup output, to insert rows in destination. Written within " Add OLE DB Destination Data Flow for No Match Output" region.

Step 6: Than I added a Oledb Command Data flow control which will update the existing rows using update sql command. Written within " Add OLE DB Command Data Flow for Matching Output" region. Written within "Add OLE DB Command Data Flow for No Match Output" region.




Now we need to do mapping of columns for Oledb Destination and Oledb Command Data Flow control.

Step 7: Doing mapping of columns for Oledb Destination data flow control. written within "Connect source and destination for No Match Outputs" region.

Step 8: Doing mapping of columns for Oledb Command data flow control. written within "Connect source and destination for Matching Outputs" region.

Here the destination external column for Oledb Command will be auto generated with the name like Param_0, Param_1 ... etc, based on params required by sql command defined with '?' symbol in oledb command data flow control.

You have to create some logic to do this mapping, as column names are not same (refer below pic)



Step 9: Than at last saving the package to some location to use it later, using some job to update data periodically.

Finally if you try to open this package using SQL Server BI Dev Studio, it will look like below image here:




and here we are done.

Below are the links which I referred also suggest you the refer:

1. http://blogs.msdn.com/b/mattm/archive/2008/12/30/samples-for-creating-ssis-packages-programmatically.aspx
2. http://www.sqlis.com/sqlis/post/CreationName-2008.aspx.


Feel free to provide your valuable feedback/suggestion.

 (Excuse me for any typo or grammar mistake and wish you happy reading)
Thank You





4 comments:

  1. Thanks for sharing and nice article, SSIS is much limited to oledb connector. hopefully when MS moves to odbc as unversal driver and make enough support across all dev tools then SSIS can be considered an generic ETL tool perhaps.

    ReplyDelete
  2. Hi Binod - Thanks for this nice article. I've got an issue while creating the SSIS package - when adding connections to the package as in your code: ConnectionManager connection = package.Connections.Add("OLEDB"); . The exception is - Microsoft.SqlServer.Dts.Runtime.DtsRuntimeException: The connection type "OLEDB" specified for connection manager "{89241BBE-E5AD-4BEA-9309-7BA55E2DA871}" is not recognized as a valid connection manager type. This error is returned when an attempt is made to create a connection manager for an unknown connection type. Check the spelling in the connection type name. Can you please look at this issue?

    ReplyDelete
    Replies
    1. Hi Jeevan, Sorry for late reply. This code is fully tested and I do not face any such errors. Please have a look on dlls references once as I mentioned...

      In case if I get something related to your issue I'll reply here the same.

      Delete
  3. Hi Jivan, I had the same issue like you. The solution for me was, to the Assembly Microsoft.SQLServer.DTSRuntimeWrap at Embed Interop Types to True.

    ReplyDelete