Hi friends,
Today I am going to show you, how you can create a SSIS package programmatically in C#.
Before that, let's discuss first problem statement or scenario where we might need this kind of solution.
Problem Statement: I need to extract data from multiple tables in a database and save that extracted data to a separate database for the purpose of read-only use (for example Reporting purpose). We need to do this because we want to avoid load on OLTP system. Requirement is to automate this process, so any time I should be able to create a new table (based on new requirement coming) and load/update data in that table without writing code each time.
Solution: Here I created a table(as per below mentioned table schema) programmatically in database now next step to load data on this table.
CREATE TABLE [dbo].[TestTableDest]
(
[ProductID] [int] IDENTITY(1,1) NOT NULL,
[Name] [varchar](50) NOT NULL,
[ProductNumber] [varchar](50) NOT NULL,
CONSTRAINT [IX_TestTableDest] UNIQUE NONCLUSTERED
(
[ProductID] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = ON, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
Now let's create SSIS package to load data to this table programmatically using C#. Requirement to insert new rows (if it is new) or update the existing row (if it is already exists) based on unique key value.
To do this we need below assemble reference to our project.
And then below is the code which will do our task as defined here.
Now let me explain (in steps) what I have done here.
Step 7: Doing mapping of columns for Oledb Destination data flow control. written within "Connect source and destination for No Match Outputs" region.
Step 8: Doing mapping of columns for Oledb Command data flow control. written within "Connect source and destination for Matching Outputs" region.
Here the destination external column for Oledb Command will be auto generated with the name like Param_0, Param_1 ... etc, based on params required by sql command defined with '?' symbol in oledb command data flow control.
You have to create some logic to do this mapping, as column names are not same (refer below pic)
Step 9: Than at last saving the package to some location to use it later, using some job to update data periodically.
Finally if you try to open this package using SQL Server BI Dev Studio, it will look like below image here:
and here we are done.
Below are the links which I referred also suggest you the refer:
1. http://blogs.msdn.com/b/mattm/archive/2008/12/30/samples-for-creating-ssis-packages-programmatically.aspx
2. http://www.sqlis.com/sqlis/post/CreationName-2008.aspx.
Feel free to provide your valuable feedback/suggestion.
(Excuse me for any typo or grammar mistake and wish you happy reading)
Thank You
Today I am going to show you, how you can create a SSIS package programmatically in C#.
Before that, let's discuss first problem statement or scenario where we might need this kind of solution.
Problem Statement: I need to extract data from multiple tables in a database and save that extracted data to a separate database for the purpose of read-only use (for example Reporting purpose). We need to do this because we want to avoid load on OLTP system. Requirement is to automate this process, so any time I should be able to create a new table (based on new requirement coming) and load/update data in that table without writing code each time.
Solution: Here I created a table(as per below mentioned table schema) programmatically in database now next step to load data on this table.
CREATE TABLE [dbo].[TestTableDest]
(
[ProductID] [int] IDENTITY(1,1) NOT NULL,
[Name] [varchar](50) NOT NULL,
[ProductNumber] [varchar](50) NOT NULL,
CONSTRAINT [IX_TestTableDest] UNIQUE NONCLUSTERED
(
[ProductID] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = ON, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
Now let's create SSIS package to load data to this table programmatically using C#. Requirement to insert new rows (if it is new) or update the existing row (if it is already exists) based on unique key value.
To do this we need below assemble reference to our project.
1. Microsoft.SqlServer. ADONETSrc
Location: C:\Program Files (x86)\Microsoft SQL Server\100\DTS\ PipelineComponents\Microsoft. SqlServer.ADONETSrc.dll
2. Microsoft.SqlServer. DTSPipelineWrap
Location: C:\Program Files (x86)\Microsoft SQL Server\100\SDK\Assemblies\ Microsoft.SqlServer. DTSPipelineWrap.dll
3. Microsoft.SqlServer. DTSRuntimeWrap
Location: C:\Program Files (x86)\Microsoft SQL Server\100\SDK\Assemblies\ Microsoft.SQLServer. DTSRuntimeWrap.dll
4. Microsoft.SqlServer. ManagedDTS
Location: C:\Program Files (x86)\Microsoft SQL Server\100\SDK\Assemblies\ Microsoft.SQLServer. ManagedDTS.dll
5. Microsoft.SqlServer. PipelineHost
Location: C:\Program Files (x86)\Microsoft SQL Server\100\SDK\Assemblies\ Microsoft.SqlServer. PipelineHost.dll
And then below is the code which will do our task as defined here.
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Xml;
namespace IDS.DynamicSSIS
{
class Program3
{
static void Main(string[] args)
{
Application app = new Application();
Package package = new Package();
#region Add Data Flow Task
Executable dataFlowTask = package.Executables.Add("STOCK:PipelineTask");
// Set
the name (otherwise it will be a random GUID value)
TaskHost taskHost = dataFlowTask as TaskHost;
taskHost.Name = "Data Flow Task";
// We
need a reference to the InnerObject to add items to the data flow
MainPipe pipeline = taskHost.InnerObject as MainPipe;
#endregion Add Data Flow Task
#region Add connection manager
ConnectionManager connection = package.Connections.Add("OLEDB");
connection.Name = "localhost";
connection.ConnectionString = "Data Source=localhost;Initial
Catalog=TestDB;Provider=SQLNCLI10.1;Integrated Security=SSPI;Auto
Translate=False;";
#endregion
Add connection manager
#region Add OLE DB Source Data Flow
// Add
OLEDB Source
IDTSComponentMetaData100 srcComponent =
pipeline.ComponentMetaDataCollection.New();
srcComponent.ComponentClassID = "DTSAdapter.OleDbSource";
srcComponent.ValidateExternalMetadata = true;
IDTSDesigntimeComponent100 srcDesignTimeComponent = srcComponent.Instantiate();
srcDesignTimeComponent.ProvideComponentProperties();
srcComponent.Name = "OleDb Source";
//
Configure it to read from the given table
srcDesignTimeComponent.SetComponentProperty("AccessMode", 2);// 2 - SQL Command
srcDesignTimeComponent.SetComponentProperty("SqlCommand", "Select * from dbo.TestTableSource");
// Set
the connection manager
srcComponent.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(connection);
srcComponent.RuntimeConnectionCollection[0].ConnectionManagerID =
connection.ID;
//
Retrieve the column metadata
srcDesignTimeComponent.AcquireConnections(null);
srcDesignTimeComponent.ReinitializeMetaData();
srcDesignTimeComponent.ReleaseConnections();
IDTSOutputColumnCollection100 sourceColumns =
srcComponent.OutputCollection[0].OutputColumnCollection;
#endregion Add OLE DB Source Data Flow
#region lookup transform
// Add
transform
IDTSComponentMetaData100 lookupComponent =
pipeline.ComponentMetaDataCollection.New();
lookupComponent.ComponentClassID = "DTSTransform.Lookup";
lookupComponent.Name = "Lookup";
CManagedComponentWrapper lookupWrapper = lookupComponent.Instantiate();
lookupWrapper.ProvideComponentProperties();
//
Connect the source and the transform
IDTSPath100 lookUpPath = pipeline.PathCollection.New();
lookUpPath.AttachPathAndPropagateNotifications(srcComponent.OutputCollection[0],
lookupComponent.InputCollection[0]);
// Set
the connection manager
lookupComponent.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(connection);
lookupComponent.RuntimeConnectionCollection[0].ConnectionManagerID =
connection.ID;
// Cache
Type - Full = 0, Partial = 1, None = 2
lookupWrapper.SetComponentProperty("CacheType", 0);
lookupWrapper.SetComponentProperty("NoMatchBehavior",
1);// 1= Redirect rows to No Match output
lookupWrapper.SetComponentProperty("SqlCommand", "select Name, ProductNumber from
[dbo].[TestTableDest]");
//
initialize metadata
lookupWrapper.AcquireConnections(null);
lookupWrapper.ReinitializeMetaData();
lookupWrapper.ReleaseConnections();
// Mark
the columns we are joining on
IDTSInput100 lookupInput = lookupComponent.InputCollection[0];
IDTSInputColumnCollection100 lookupInputColumns = lookupInput.InputColumnCollection;
IDTSVirtualInput100 lookupVirtualInput = lookupInput.GetVirtualInput();
IDTSVirtualInputColumnCollection100 lookupVirtualInputColumns =
lookupVirtualInput.VirtualInputColumnCollection;
// Note:
join columns should be marked as READONLY
var joinColumns = new string[] { "Name" };
foreach (string columnName in joinColumns)
{
IDTSVirtualInputColumn100 virtualColumn = lookupVirtualInputColumns[columnName];
IDTSInputColumn100 inputColumn = lookupWrapper.SetUsageType(lookupInput.ID,
lookupVirtualInput, virtualColumn.LineageID, DTSUsageType.UT_READONLY);
lookupWrapper.SetInputColumnProperty(lookupInput.ID, inputColumn.ID, "JoinToReferenceColumn", columnName);
}
// First
output is the Match output
IDTSOutput100 lookupMatchOutput = lookupComponent.OutputCollection[0];
//
Second output is the Un-Match output
IDTSOutput100 lookupNoMatchOutput =
lookupComponent.OutputCollection[1];
#endregion lookup transform
#region Add OLE DB Destination Data Flow for No Match Output
// Add
OLEDB Source
IDTSComponentMetaData100 destNoMatchComponent =
pipeline.ComponentMetaDataCollection.New();
destNoMatchComponent.ComponentClassID
= "DTSAdapter.OleDbDestination";
destNoMatchComponent.ValidateExternalMetadata = true;
IDTSDesigntimeComponent100 destNoMatchDesignTimeComponent =
destNoMatchComponent.Instantiate();
destNoMatchDesignTimeComponent.ProvideComponentProperties();
destNoMatchComponent.Name = "OleDb Destination";
//
Configure it to read from the given table
destNoMatchDesignTimeComponent.SetComponentProperty("AccessMode", 3);// 3 - OpenRowset
destNoMatchDesignTimeComponent.SetComponentProperty("OpenRowset", "[dbo].[TestTableDest]");
// Set
the connection manager
destNoMatchComponent.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(connection);
destNoMatchComponent.RuntimeConnectionCollection[0].ConnectionManagerID
= connection.ID;
//
Retrieve the column metadata
destNoMatchDesignTimeComponent.AcquireConnections(null);
destNoMatchDesignTimeComponent.ReinitializeMetaData();
destNoMatchDesignTimeComponent.ReleaseConnections();
#endregion Add OLE DB Destination Data Flow for No Match Output
#region Add OLE DB Command Data Flow for Matching Output
// Add
OLEDB Source
IDTSComponentMetaData100 destMatchComponent =
pipeline.ComponentMetaDataCollection.New();
destMatchComponent.ComponentClassID
= "DTSTransform.OLEDBCommand";
destMatchComponent.ValidateExternalMetadata =
true;
IDTSDesigntimeComponent100 destMatchDesignTimeComponent =
destMatchComponent.Instantiate();
destMatchDesignTimeComponent.ProvideComponentProperties();
destMatchComponent.Name = "OleDb Command";
//
Configure it to read from the given table
destMatchDesignTimeComponent.SetComponentProperty("CommandTimeout",
0);
destMatchDesignTimeComponent.SetComponentProperty("SqlCommand", "UPDATE [dbo].[TestTableDest] SET [ProductNumber] = ?
WHERE [Name] = ?");
// Set
the connection manager
destMatchComponent.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(connection);
destMatchComponent.RuntimeConnectionCollection[0].ConnectionManagerID
= connection.ID;
//
Retrieve the column metadata
destMatchDesignTimeComponent.AcquireConnections(null);
destMatchDesignTimeComponent.ReinitializeMetaData();
destMatchDesignTimeComponent.ReleaseConnections();
#endregion Add OLE DB Command Data Flow for No Match Output
#region Connect source and destination for No Match Outputs
IDTSPath100 noMatchPath = pipeline.PathCollection.New();
noMatchPath.AttachPathAndPropagateNotifications(lookupNoMatchOutput,
destNoMatchComponent.InputCollection[0]);
//
Configure the destination
IDTSInput100 destNoMatchInput = destNoMatchComponent.InputCollection[0];
IDTSVirtualInput100 destNoMatchVirInput = destNoMatchInput.GetVirtualInput();
IDTSInputColumnCollection100 destNoMatchInputCols =
destNoMatchInput.InputColumnCollection;
IDTSExternalMetadataColumnCollection100 destNoMatchExtCols =
destNoMatchInput.ExternalMetadataColumnCollection;
// The
OLEDB destination requires you to hook up the external columns
foreach (IDTSOutputColumn100 outputCol in sourceColumns)
{
//
Get the external column id
IDTSExternalMetadataColumn100 extCol = (IDTSExternalMetadataColumn100)destNoMatchExtCols[outputCol.Name];
if (extCol != null)
{
//
Create an input column from an output col of previous component.
destNoMatchVirInput.SetUsageType(outputCol.ID, DTSUsageType.UT_READONLY);
IDTSInputColumn100
inputCol = destNoMatchInputCols.GetInputColumnByLineageID(outputCol.ID);
if (inputCol != null)
{
// map the input column with an external metadata column
destNoMatchDesignTimeComponent.MapInputColumn(destNoMatchInput.ID,
inputCol.ID, extCol.ID);
}
}
}
#endregion Connect source and destination for No Match Outputs
#region Connect source and destination for Matching Outputs
IDTSPath100 matchPath = pipeline.PathCollection.New();
matchPath.AttachPathAndPropagateNotifications(lookupMatchOutput,
destMatchComponent.InputCollection[0]);
//
Configure the destination
IDTSInput100 destMatchInput = destMatchComponent.InputCollection[0];
IDTSVirtualInput100 destMatchVirInput = destMatchInput.GetVirtualInput();
IDTSInputColumnCollection100 destMatchInputCols =
destMatchInput.InputColumnCollection;
IDTSExternalMetadataColumnCollection100 destMatchExtCols =
destMatchInput.ExternalMetadataColumnCollection;
// The
OLEDB destination requires you to hook up the external columns
Dictionary<string, string> parameters = new Dictionary<string, string>();
parameters.Add("Param_0", "ProductNumber");
parameters.Add("Param_1", "Name");
foreach (KeyValuePair<string, string> paramValue in parameters)
{
//
Get the external column id
IDTSExternalMetadataColumn100 extCol = (IDTSExternalMetadataColumn100)destMatchExtCols[paramValue.Key];
if (extCol != null)
{
foreach (IDTSOutputColumn100
outputCol in
sourceColumns)
{
if (outputCol.Name ==
paramValue.Value)
{
// Create an input column from an output col of previous
component.
destMatchVirInput.SetUsageType(outputCol.ID, DTSUsageType.UT_READONLY);
IDTSInputColumn100
inputCol = destMatchInputCols.GetInputColumnByLineageID(outputCol.ID);
if (inputCol != null)
{
// map the input column with an external metadata column
destMatchDesignTimeComponent.MapInputColumn(destMatchInput.ID,
inputCol.ID, extCol.ID);
}
}
}
}
}
#endregion Connect source and destination for No Match Outputs
#region save package as xml
string packageXML = @"C:\ORCC\POC\pkg1.dtsx";
XmlDocument myPkgDocument = new XmlDocument();
package.SaveToXML(ref myPkgDocument, null, null);
package.SaveToXML(ref myPkgDocument, null, null);
app.SaveToXml(packageXML, package, null);
#endregion save package as xml
package.Execute();
Console.WriteLine("Completed");
Console.ReadLine();
}
}
}Now let me explain (in steps) what I have done here.
Step 1:Created a Data Flow Task. Written within "Add Data Flow Task" region.
Step 2: Creating connection manager to connect to source and destination database. To make example simple I used same database as a source as well as destination. Written within "Add connection manager" region.
Step 3: Created Oledb Source Data Flow control which will extract the data from the source location. Written within "Add OLE DB Source Data Flow" region.
Step 4: Created a lookup data flow control which will help to identify the new row and updated row by looking to destination database. Written within "lookup transform" region.
Here we need to mention which column you want to look in destination to identify rows. in my case I am using 'Name' column (the column which will be used here must be unique).
As a result lookup will give result as 'Match Column'(rows already exists in destination and need update) and 'No Match Column'(new rows and need insert).
Here we need to mention which column you want to look in destination to identify rows. in my case I am using 'Name' column (the column which will be used here must be unique).
As a result lookup will give result as 'Match Column'(rows already exists in destination and need update) and 'No Match Column'(new rows and need insert).
Step 5: Next I added a Oledb Destination Data flow control for No Match lookup output, to insert rows in destination. Written within " Add OLE DB Destination Data Flow for No Match Output" region.
Step 6: Than I added a Oledb Command Data flow control which will update the existing rows using update sql command. Written within " Add OLE DB Command Data Flow for Matching Output" region. Written within "Add OLE DB Command Data Flow for No Match Output" region.
Now we need to do mapping of columns for Oledb Destination and Oledb Command Data Flow control.
Step 6: Than I added a Oledb Command Data flow control which will update the existing rows using update sql command. Written within " Add OLE DB Command Data Flow for Matching Output" region. Written within "Add OLE DB Command Data Flow for No Match Output" region.
Now we need to do mapping of columns for Oledb Destination and Oledb Command Data Flow control.
Step 7: Doing mapping of columns for Oledb Destination data flow control. written within "Connect source and destination for No Match Outputs" region.
Step 8: Doing mapping of columns for Oledb Command data flow control. written within "Connect source and destination for Matching Outputs" region.
Here the destination external column for Oledb Command will be auto generated with the name like Param_0, Param_1 ... etc, based on params required by sql command defined with '?' symbol in oledb command data flow control.
Step 9: Than at last saving the package to some location to use it later, using some job to update data periodically.
Finally if you try to open this package using SQL Server BI Dev Studio, it will look like below image here:
and here we are done.
Below are the links which I referred also suggest you the refer:
1. http://blogs.msdn.com/b/mattm/archive/2008/12/30/samples-for-creating-ssis-packages-programmatically.aspx
2. http://www.sqlis.com/sqlis/post/CreationName-2008.aspx.
Feel free to provide your valuable feedback/suggestion.
(Excuse me for any typo or grammar mistake and wish you happy reading)
Thank You
Thanks for sharing and nice article, SSIS is much limited to oledb connector. hopefully when MS moves to odbc as unversal driver and make enough support across all dev tools then SSIS can be considered an generic ETL tool perhaps.
ReplyDeleteHi Binod - Thanks for this nice article. I've got an issue while creating the SSIS package - when adding connections to the package as in your code: ConnectionManager connection = package.Connections.Add("OLEDB"); . The exception is - Microsoft.SqlServer.Dts.Runtime.DtsRuntimeException: The connection type "OLEDB" specified for connection manager "{89241BBE-E5AD-4BEA-9309-7BA55E2DA871}" is not recognized as a valid connection manager type. This error is returned when an attempt is made to create a connection manager for an unknown connection type. Check the spelling in the connection type name. Can you please look at this issue?
ReplyDeleteHi Jeevan, Sorry for late reply. This code is fully tested and I do not face any such errors. Please have a look on dlls references once as I mentioned...
DeleteIn case if I get something related to your issue I'll reply here the same.
Hi Jivan, I had the same issue like you. The solution for me was, to the Assembly Microsoft.SQLServer.DTSRuntimeWrap at Embed Interop Types to True.
ReplyDeleteSystem.Runtime.InteropServices.COMException: 'Exception from HRESULT: 0xC0048021' getting exception
ReplyDelete